diff options
author | Sergei Golubchik <sergii@pisem.net> | 2011-10-19 21:45:18 +0200 |
---|---|---|
committer | Sergei Golubchik <sergii@pisem.net> | 2011-10-19 21:45:18 +0200 |
commit | 76f0b94bb0b2994d639353530c5b251d0f1a204b (patch) | |
tree | 9ed50628aac34f89a37637bab2fc4915b86b5eb4 /sql/log.cc | |
parent | 4e46d8e5bff140f2549841167dc4b65a3c0a645d (diff) | |
parent | 5dc1a2231f55bacc9aaf0e24816f3d9c2ee1f21d (diff) | |
download | mariadb-git-76f0b94bb0b2994d639353530c5b251d0f1a204b.tar.gz |
merge with 5.3
sql/sql_insert.cc:
CREATE ... IF NOT EXISTS may do nothing, but
it is still not a failure. don't forget to my_ok it.
******
CREATE ... IF NOT EXISTS may do nothing, but
it is still not a failure. don't forget to my_ok it.
sql/sql_table.cc:
small cleanup
******
small cleanup
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 1595 |
1 files changed, 1284 insertions, 311 deletions
diff --git a/sql/log.cc b/sql/log.cc index e66ca32d560..2b248fa80ba 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -1,4 +1,5 @@ -/* Copyright (c) 2000, 2011, Oracle and/or its affiliates. All rights reserved. +/* Copyright (c) 2000, 2011, Oracle and/or its affiliates. + Copyright (c) 2010-2011 Monty Program Ab This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -50,6 +51,7 @@ #include "sql_plugin.h" #include "rpl_handler.h" +#include "debug_sync.h" /* max size of the log message */ #define MAX_LOG_BUFFER_SIZE 1024 @@ -71,6 +73,38 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv); static int binlog_commit(handlerton *hton, THD *thd, bool all); static int binlog_rollback(handlerton *hton, THD *thd, bool all); static int binlog_prepare(handlerton *hton, THD *thd, bool all); +static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd); + +static LEX_STRING const write_error_msg= + { C_STRING_WITH_LEN("error writing to the binary log") }; + +static my_bool opt_optimize_thread_scheduling= TRUE; +ulong binlog_checksum_options; +#ifndef DBUG_OFF +static ulong opt_binlog_dbug_fsync_sleep= 0; +#endif + +mysql_mutex_t LOCK_prepare_ordered; +mysql_mutex_t LOCK_commit_ordered; + +static ulonglong binlog_status_var_num_commits; +static ulonglong binlog_status_var_num_group_commits; +static char binlog_snapshot_file[FN_REFLEN]; +static ulonglong binlog_snapshot_position; + +static SHOW_VAR binlog_status_vars_detail[]= +{ + {"commits", + (char *)&binlog_status_var_num_commits, SHOW_LONGLONG}, + {"group_commits", + (char *)&binlog_status_var_num_group_commits, SHOW_LONGLONG}, + {"snapshot_file", + (char *)&binlog_snapshot_file, SHOW_CHAR}, + {"snapshot_position", + (char *)&binlog_snapshot_position, SHOW_LONGLONG}, + {NullS, NullS, SHOW_LONG} +}; + /** purge logs, master and slave sides both, related error code @@ -148,59 +182,27 @@ sql_print_message_func sql_print_message_handlers[3] = sql_print_error }; -/** - Create the name of the log specified. - This method forms a new path + file name for the - log specified in @c name. - - @param[IN] buff Location for building new string. - @param[IN] name Name of the log file. - @param[IN] log_ext The extension for the log (e.g. .log). - - @returns Pointer to new string containing the name. +/** + Create the name of the log file + + @param[OUT] out a pointer to a new allocated name will go there + @param[IN] log_ext The extension for the file (e.g .log) + @param[IN] once whether to use malloc_once or a normal malloc. */ -char *make_log_name(char *buff, const char *name, const char* log_ext) +void make_default_log_name(char **out, const char* log_ext, bool once) { - strmake(buff, name, FN_REFLEN-5); - return fn_format(buff, buff, mysql_real_data_home, log_ext, - MYF(MY_UNPACK_FILENAME|MY_REPLACE_EXT)); -} - -/* - Helper class to hold a mutex for the duration of the - block. - - Eliminates the need for explicit unlocking of mutexes on, e.g., - error returns. On passing a null pointer, the sentry will not do - anything. - */ -class Mutex_sentry -{ -public: - Mutex_sentry(mysql_mutex_t *mutex) - : m_mutex(mutex) - { - if (m_mutex) - mysql_mutex_lock(mutex); - } - - ~Mutex_sentry() + char buff[FN_REFLEN+10]; + fn_format(buff, opt_log_basename, "", log_ext, MYF(MY_REPLACE_EXT)); + if (once) + *out= my_once_strdup(buff, MYF(MY_WME)); + else { - if (m_mutex) - mysql_mutex_unlock(m_mutex); -#ifndef DBUG_OFF - m_mutex= 0; -#endif + my_free(*out); + *out= my_strdup(buff, MYF(MY_WME)); } +} -private: - mysql_mutex_t *m_mutex; - - // It's not allowed to copy this object in any way - Mutex_sentry(Mutex_sentry const&); - void operator=(Mutex_sentry const&); -}; /* Helper classes to store non-transactional and transactional data @@ -422,6 +424,7 @@ public: ulong *param_ptr_binlog_stmt_cache_disk_use, ulong *param_ptr_binlog_cache_use, ulong *param_ptr_binlog_cache_disk_use) + : last_commit_pos_offset(0), using_xa(FALSE), xa_xid(0) { stmt_cache.set_binlog_cache_info(param_max_binlog_stmt_cache_size, param_ptr_binlog_stmt_cache_use, @@ -429,11 +432,20 @@ public: trx_cache.set_binlog_cache_info(param_max_binlog_cache_size, param_ptr_binlog_cache_use, param_ptr_binlog_cache_disk_use); + last_commit_pos_file[0]= 0; } - void reset_cache(binlog_cache_data* cache_data) + void reset(bool do_stmt, bool do_trx) { - cache_data->reset(); + if (do_stmt) + stmt_cache.reset(); + if (do_trx) + { + trx_cache.reset(); + using_xa= FALSE; + last_commit_pos_file[0]= 0; + last_commit_pos_offset= 0; + } } binlog_cache_data* get_binlog_cache_data(bool is_transactional) @@ -450,6 +462,23 @@ public: binlog_cache_data trx_cache; + /* + Binlog position for current transaction. + For START TRANSACTION WITH CONSISTENT SNAPSHOT, this is the binlog + position corresponding to the snapshot taken. During (and after) commit, + this is set to the binlog position corresponding to just after the + commit (so storage engines can store it in their transaction log). + */ + char last_commit_pos_file[FN_REFLEN]; + my_off_t last_commit_pos_offset; + + /* + Flag set true if this transaction is committed with log_xid() as part of + XA, false if not. + */ + bool using_xa; + my_xid xa_xid; + private: binlog_cache_mngr& operator=(const binlog_cache_mngr& info); @@ -552,7 +581,7 @@ void Log_to_csv_event_handler::cleanup() */ bool Log_to_csv_event_handler:: - log_general(THD *thd, time_t event_time, const char *user_host, + log_general(THD *thd, my_hrtime_t event_time, const char *user_host, uint user_host_len, int thread_id, const char *command_type, uint command_type_len, const char *sql_text, uint sql_text_len, @@ -629,8 +658,8 @@ bool Log_to_csv_event_handler:: DBUG_ASSERT(table->field[0]->type() == MYSQL_TYPE_TIMESTAMP); - ((Field_timestamp*) table->field[0])->store_timestamp((my_time_t) - event_time); + ((Field_timestamp*) table->field[0])->store_TIME( + hrtime_to_my_time(event_time), hrtime_sec_part(event_time)); /* do a write */ if (table->field[1]->store(user_host, user_host_len, client_cs) || @@ -694,7 +723,6 @@ err: log_slow() thd THD of the query current_time current timestamp - query_start_arg command start timestamp user_host the pointer to the string with user@host info user_host_len length of the user_host string. this is computed once and passed to all general log event handlers @@ -717,7 +745,7 @@ err: */ bool Log_to_csv_event_handler:: - log_slow(THD *thd, time_t current_time, time_t query_start_arg, + log_slow(THD *thd, my_hrtime_t current_time, const char *user_host, uint user_host_len, ulonglong query_utime, ulonglong lock_utime, bool is_command, const char *sql_text, uint sql_text_len) @@ -731,6 +759,11 @@ bool Log_to_csv_event_handler:: Open_tables_backup open_tables_backup; CHARSET_INFO *client_cs= thd->variables.character_set_client; bool save_time_zone_used; + long query_time= (long) min(query_utime/1000000, TIME_MAX_VALUE_SECONDS); + long lock_time= (long) min(lock_utime/1000000, TIME_MAX_VALUE_SECONDS); + long query_time_micro= (long) (query_utime % 1000000); + long lock_time_micro= (long) (lock_utime % 1000000); + DBUG_ENTER("Log_to_csv_event_handler::log_slow"); thd->push_internal_handler(& error_handler); @@ -767,45 +800,34 @@ bool Log_to_csv_event_handler:: /* store the time and user values */ DBUG_ASSERT(table->field[0]->type() == MYSQL_TYPE_TIMESTAMP); - ((Field_timestamp*) table->field[0])->store_timestamp((my_time_t) - current_time); + ((Field_timestamp*) table->field[0])->store_TIME( + hrtime_to_my_time(current_time), hrtime_sec_part(current_time)); if (table->field[1]->store(user_host, user_host_len, client_cs)) goto err; - if (query_start_arg) - { - longlong query_time= (longlong) (query_utime/1000000); - longlong lock_time= (longlong) (lock_utime/1000000); - /* - A TIME field can not hold the full longlong range; query_time or - lock_time may be truncated without warning here, if greater than - 839 hours (~35 days) - */ - MYSQL_TIME t; - t.neg= 0; + /* + A TIME field can not hold the full longlong range; query_time or + lock_time may be truncated without warning here, if greater than + 839 hours (~35 days) + */ + MYSQL_TIME t; + t.neg= 0; + + /* fill in query_time field */ + calc_time_from_sec(&t, query_time, query_time_micro); + if (table->field[2]->store_time(&t)) + goto err; + /* lock_time */ + calc_time_from_sec(&t, lock_time, lock_time_micro); + if (table->field[3]->store_time(&t)) + goto err; + /* rows_sent */ + if (table->field[4]->store((longlong) thd->sent_row_count, TRUE)) + goto err; + /* rows_examined */ + if (table->field[5]->store((longlong) thd->examined_row_count, TRUE)) + goto err; - /* fill in query_time field */ - calc_time_from_sec(&t, (long) min(query_time, (longlong) TIME_MAX_VALUE_SECONDS), 0); - if (table->field[2]->store_time(&t, MYSQL_TIMESTAMP_TIME)) - goto err; - /* lock_time */ - calc_time_from_sec(&t, (long) min(lock_time, (longlong) TIME_MAX_VALUE_SECONDS), 0); - if (table->field[3]->store_time(&t, MYSQL_TIMESTAMP_TIME)) - goto err; - /* rows_sent */ - if (table->field[4]->store((longlong) thd->sent_row_count, TRUE)) - goto err; - /* rows_examined */ - if (table->field[5]->store((longlong) thd->examined_row_count, TRUE)) - goto err; - } - else - { - table->field[2]->set_null(); - table->field[3]->set_null(); - table->field[4]->set_null(); - table->field[5]->set_null(); - } /* fill database field */ if (thd->db) { @@ -937,14 +959,14 @@ void Log_to_file_event_handler::init_pthread_objects() /** Wrapper around MYSQL_LOG::write() for slow log. */ bool Log_to_file_event_handler:: - log_slow(THD *thd, time_t current_time, time_t query_start_arg, + log_slow(THD *thd, my_hrtime_t current_time, const char *user_host, uint user_host_len, ulonglong query_utime, ulonglong lock_utime, bool is_command, const char *sql_text, uint sql_text_len) { Silence_log_table_errors error_handler; thd->push_internal_handler(&error_handler); - bool retval= mysql_slow_log.write(thd, current_time, query_start_arg, + bool retval= mysql_slow_log.write(thd, hrtime_to_my_time(current_time), user_host, user_host_len, query_utime, lock_utime, is_command, sql_text, sql_text_len); @@ -959,7 +981,7 @@ bool Log_to_file_event_handler:: */ bool Log_to_file_event_handler:: - log_general(THD *thd, time_t event_time, const char *user_host, + log_general(THD *thd, my_hrtime_t event_time, const char *user_host, uint user_host_len, int thread_id, const char *command_type, uint command_type_len, const char *sql_text, uint sql_text_len, @@ -967,7 +989,8 @@ bool Log_to_file_event_handler:: { Silence_log_table_errors error_handler; thd->push_internal_handler(&error_handler); - bool retval= mysql_log.write(event_time, user_host, user_host_len, + bool retval= mysql_log.write(hrtime_to_time(event_time), user_host, + user_host_len, thread_id, command_type, command_type_len, sql_text, sql_text_len); thd->pop_internal_handler(); @@ -1200,8 +1223,6 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length, if (*slow_log_handler_list) { - time_t current_time; - /* do not log slow queries from replication threads */ if (thd->slave_thread && !opt_log_slow_slave_statements) return 0; @@ -1221,16 +1242,12 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length, sctx->ip ? sctx->ip : "", "]", NullS) - user_host_buff); - current_time= my_time_possible_from_micro(current_utime); - if (thd->start_utime) - { - query_utime= (current_utime - thd->start_utime); - lock_utime= (thd->utime_after_lock - thd->start_utime); - } - else - { - query_utime= lock_utime= 0; - } + DBUG_ASSERT(thd->start_utime); + DBUG_ASSERT(thd->start_time); + query_utime= (current_utime - thd->start_utime); + lock_utime= (thd->utime_after_lock - thd->start_utime); + my_hrtime_t current_time= { hrtime_from_time(thd->start_time) + + thd->start_time_sec_part + query_utime }; if (!query) { @@ -1251,7 +1268,7 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length, } for (current_handler= slow_log_handler_list; *current_handler ;) - error= (*current_handler++)->log_slow(thd, current_time, thd->start_time, + error= (*current_handler++)->log_slow(thd, current_time, user_host_buff, user_host_len, query_utime, lock_utime, is_command, query, query_length) || error; @@ -1268,7 +1285,7 @@ bool LOGGER::general_log_write(THD *thd, enum enum_server_command command, Log_event_handler **current_handler= general_log_handler_list; char user_host_buff[MAX_USER_HOST_SIZE + 1]; uint user_host_len= 0; - time_t current_time; + my_hrtime_t current_time; DBUG_ASSERT(thd); @@ -1280,9 +1297,9 @@ bool LOGGER::general_log_write(THD *thd, enum enum_server_command command, } user_host_len= make_user_name(thd, user_host_buff); - current_time= my_time(0); + current_time= my_hrtime(); - mysql_audit_general_log(thd, current_time, + mysql_audit_general_log(thd, hrtime_to_time(current_time), user_host_buff, user_host_len, command_name[(uint) command].str, command_name[(uint) command].length, @@ -1587,6 +1604,7 @@ int binlog_init(void *p) binlog_hton->commit= binlog_commit; binlog_hton->rollback= binlog_rollback; binlog_hton->prepare= binlog_prepare; + binlog_hton->start_consistent_snapshot= binlog_start_consistent_snapshot; binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN; return 0; } @@ -1602,48 +1620,69 @@ static int binlog_close_connection(handlerton *hton, THD *thd) return 0; } -/** +/* This function flushes a cache upon commit/rollback. - @param thd The thread whose transaction should be flushed - @param cache_data Pointer to the cache - @param end_ev The end event either commit/rollback - @param is_transactional The type of the cache: transactional or - non-transactional + SYNOPSIS + binlog_flush_cache() - @return - nonzero if an error pops up when flushing the cache. -*/ -static inline int -binlog_flush_cache(THD *thd, binlog_cache_data* cache_data, Log_event *end_evt, - bool is_transactional) + thd The thread whose transaction should be ended + cache_mngr Pointer to the binlog_cache_mngr to use + all True if the entire transaction should be ended, false if + only the statement transaction should be ended. + end_ev The end event to use (COMMIT, ROLLBACK, or commit XID) + using_stmt True if the statement cache should be flushed + using_trx True if the transaction cache should be flushed + + DESCRIPTION + + End the currently transaction or statement. The transaction can be either + a real transaction or a statement transaction. + + This can be to commit a transaction, with a COMMIT query event or an XA + commit XID event. But it can also be to rollback a transaction with a + ROLLBACK query event, used for rolling back transactions which also + contain updates to non-transactional tables. Or it can be a flush of + a statement cache. + */ +static int +binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, + Log_event *end_ev, bool all, bool using_stmt, + bool using_trx) { - DBUG_ENTER("binlog_flush_cache"); int error= 0; + DBUG_ENTER("binlog_flush_cache"); - if (!cache_data->empty()) + if ((using_stmt && !cache_mngr->stmt_cache.empty()) || + (using_trx && !cache_mngr->trx_cache.empty())) { - if (thd->binlog_flush_pending_rows_event(TRUE, is_transactional)) + if (using_stmt && thd->binlog_flush_pending_rows_event(TRUE, FALSE)) DBUG_RETURN(1); + if (using_trx && thd->binlog_flush_pending_rows_event(TRUE, TRUE)) + DBUG_RETURN(1); + /* Doing a commit or a rollback including non-transactional tables, i.e., ending a transaction where we might write the transaction cache to the binary log. We can always end the statement when ending a transaction since - transactions are not allowed inside stored functions. If they + transactions are not allowed inside stored functions. If they were, we would have to ensure that we're not ending a statement inside a stored function. */ - error= mysql_bin_log.write(thd, &cache_data->cache_log, end_evt, - cache_data->has_incident()); + error= mysql_bin_log.write_transaction_to_binlog(thd, cache_mngr, + end_ev, all, + using_stmt, using_trx); } - cache_data->reset(); + cache_mngr->reset(using_stmt, using_trx); - DBUG_ASSERT(cache_data->empty()); + DBUG_ASSERT((!using_stmt || cache_mngr->stmt_cache.empty()) && + (!using_trx || cache_mngr->trx_cache.empty())); DBUG_RETURN(error); } + /** This function flushes the stmt-cache upon commit. @@ -1654,13 +1693,12 @@ binlog_flush_cache(THD *thd, binlog_cache_data* cache_data, Log_event *end_evt, nonzero if an error pops up when flushing the cache. */ static inline int -binlog_commit_flush_stmt_cache(THD *thd, +binlog_commit_flush_stmt_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr) { Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), - FALSE, FALSE, TRUE, 0); - return (binlog_flush_cache(thd, &cache_mngr->stmt_cache, &end_evt, - FALSE)); + FALSE, TRUE, TRUE, 0); + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE)); } /** @@ -1673,12 +1711,11 @@ binlog_commit_flush_stmt_cache(THD *thd, nonzero if an error pops up when flushing the cache. */ static inline int -binlog_commit_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr) +binlog_commit_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr) { Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), - TRUE, FALSE, TRUE, 0); - return (binlog_flush_cache(thd, &cache_mngr->trx_cache, &end_evt, - TRUE)); + TRUE, TRUE, TRUE, 0); + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE)); } /** @@ -1691,12 +1728,12 @@ binlog_commit_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr) nonzero if an error pops up when flushing the cache. */ static inline int -binlog_rollback_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr) +binlog_rollback_flush_trx_cache(THD *thd, bool all, + binlog_cache_mngr *cache_mngr) { Query_log_event end_evt(thd, STRING_WITH_LEN("ROLLBACK"), - TRUE, FALSE, TRUE, 0); - return (binlog_flush_cache(thd, &cache_mngr->trx_cache, &end_evt, - TRUE)); + TRUE, TRUE, TRUE, 0); + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE)); } /** @@ -1710,12 +1747,26 @@ binlog_rollback_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr) nonzero if an error pops up when flushing the cache. */ static inline int -binlog_commit_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, - my_xid xid) +binlog_commit_flush_xid_caches(THD *thd, binlog_cache_mngr *cache_mngr, + bool all, my_xid xid) { - Xid_log_event end_evt(thd, xid); - return (binlog_flush_cache(thd, &cache_mngr->trx_cache, &end_evt, - TRUE)); + if (xid) + { + Xid_log_event end_evt(thd, xid, TRUE); + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); + } + else + { + /* + Empty xid occurs in XA COMMIT ... ONE PHASE. + In this case, we do not have a MySQL xid for the transaction, and the + external XA transaction coordinator will have to handle recovery if + needed. So we end the transaction with a plain COMMIT query event. + */ + Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), + TRUE, TRUE, TRUE, 0); + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); + } } /** @@ -1754,11 +1805,11 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) if (ending_trans(thd, all)) { if (cache_mngr->trx_cache.has_incident()) - error= mysql_bin_log.write_incident(thd, TRUE); + error= mysql_bin_log.write_incident(thd); thd->clear_binlog_table_maps(); - cache_mngr->reset_cache(&cache_mngr->trx_cache); + cache_mngr->reset(false, true); } /* If rolling back a statement in a transaction, we truncate the @@ -1777,7 +1828,7 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all) do nothing. just pretend we can do 2pc, so that MySQL won't switch to 1pc. - real work will be done in MYSQL_BIN_LOG::log_xid() + real work will be done in MYSQL_BIN_LOG::log_and_order() */ return 0; } @@ -1810,7 +1861,7 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) if (!cache_mngr->stmt_cache.empty()) { - error= binlog_commit_flush_stmt_cache(thd, cache_mngr); + error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr); } if (cache_mngr->trx_cache.empty()) @@ -1818,7 +1869,7 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) /* we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() */ - cache_mngr->reset_cache(&cache_mngr->trx_cache); + cache_mngr->reset(false, true); DBUG_RETURN(error); } @@ -1829,7 +1880,7 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) Otherwise, we accumulate the changes. */ if (!error && ending_trans(thd, all)) - error= binlog_commit_flush_trx_cache(thd, cache_mngr); + error= binlog_commit_flush_trx_cache(thd, all, cache_mngr); /* This is part of the stmt rollback. @@ -1868,12 +1919,12 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) */ if (cache_mngr->stmt_cache.has_incident()) { - error= mysql_bin_log.write_incident(thd, TRUE); - cache_mngr->reset_cache(&cache_mngr->stmt_cache); + error= mysql_bin_log.write_incident(thd); + cache_mngr->reset(true, false); } else if (!cache_mngr->stmt_cache.empty()) { - error= binlog_commit_flush_stmt_cache(thd, cache_mngr); + error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr); } if (cache_mngr->trx_cache.empty()) @@ -1881,7 +1932,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) /* we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() */ - cache_mngr->reset_cache(&cache_mngr->trx_cache); + cache_mngr->reset(false, true); DBUG_RETURN(error); } @@ -1921,7 +1972,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) (trans_has_updated_non_trans_table(thd) && ending_single_stmt_trans(thd,all) && thd->variables.binlog_format == BINLOG_FORMAT_MIXED))) - error= binlog_rollback_flush_trx_cache(thd, cache_mngr); + error= binlog_rollback_flush_trx_cache(thd, all, cache_mngr); /* Truncate the cache if: . aborting a single or multi-statement transaction or; @@ -2038,7 +2089,7 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv) log_query.append("`")) DBUG_RETURN(1); int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); - Query_log_event qinfo(thd, log_query.c_ptr_safe(), log_query.length(), + Query_log_event qinfo(thd, log_query.ptr(), log_query.length(), TRUE, FALSE, TRUE, errcode); DBUG_RETURN(mysql_bin_log.write(&qinfo)); } @@ -2062,7 +2113,7 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) log_query.append("`")) DBUG_RETURN(1); int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED); - Query_log_event qinfo(thd, log_query.c_ptr_safe(), log_query.length(), + Query_log_event qinfo(thd, log_query.ptr(), log_query.length(), TRUE, FALSE, TRUE, errcode); DBUG_RETURN(mysql_bin_log.write(&qinfo)); } @@ -2190,6 +2241,7 @@ static int find_uniq_filename(char *name) char *start, *end; int error= 0; DBUG_ENTER("find_uniq_filename"); + LINT_INIT(number); length= dirname_part(buff, name, &buf_length); start= name + length; @@ -2648,7 +2700,6 @@ err: thd THD of the query current_time current timestamp - query_start_arg command start timestamp user_host the pointer to the string with user@host info user_host_len length of the user_host string. this is computed once and passed to all general log event handlers @@ -2670,7 +2721,7 @@ err: */ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time, - time_t query_start_arg, const char *user_host, + const char *user_host, uint user_host_len, ulonglong query_utime, ulonglong lock_utime, bool is_command, const char *sql_text, uint sql_text_len) @@ -2847,8 +2898,12 @@ const char *MYSQL_LOG::generate_name(const char *log_name, MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period) :bytes_written(0), prepared_xids(0), file_id(1), open_count(1), need_start_event(TRUE), + group_commit_queue(0), group_commit_queue_busy(FALSE), + num_commits(0), num_group_commits(0), sync_period_ptr(sync_period), is_relay_log(0), signal_cnt(0), + checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF), + relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), description_event_for_exec(0), description_event_for_queue(0) { /* @@ -2896,7 +2951,9 @@ void MYSQL_BIN_LOG::init_pthread_objects() { MYSQL_LOG::init_pthread_objects(); mysql_mutex_init(m_key_LOCK_index, &LOCK_index, MY_MUTEX_INIT_SLOW); + mysql_mutex_setflags(&LOCK_index, MYF_NO_DEADLOCK_DETECTION); mysql_cond_init(m_key_update_cond, &update_cond, 0); + mysql_cond_init(m_key_COND_queue_busy, &COND_queue_busy, 0); } @@ -3081,7 +3138,19 @@ bool MYSQL_BIN_LOG::open(const char *log_name, as we won't be able to reset it later */ if (io_cache_type == WRITE_CACHE) - s.flags|= LOG_EVENT_BINLOG_IN_USE_F; + s.flags |= LOG_EVENT_BINLOG_IN_USE_F; + s.checksum_alg= is_relay_log ? + /* relay-log */ + /* inherit master's A descriptor if one has been received */ + (relay_log_checksum_alg= + (relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ? + relay_log_checksum_alg : + /* otherwise use slave's local preference of RL events verification */ + (opt_slave_sql_verify_checksum == 0) ? + (uint8) BINLOG_CHECKSUM_ALG_OFF : (uint8) binlog_checksum_options): + /* binlog */ + (uint8) binlog_checksum_options; + DBUG_ASSERT(s.checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); if (!s.is_valid()) goto err; s.dont_set_created= null_created_arg; @@ -3123,6 +3192,11 @@ bool MYSQL_BIN_LOG::open(const char *log_name, if (flush_io_cache(&log_file) || mysql_file_sync(log_file.file, MYF(MY_WME))) goto err; + mysql_mutex_lock(&LOCK_commit_ordered); + strmake(last_commit_pos_file, log_file_name, + sizeof(last_commit_pos_file)-1); + last_commit_pos_offset= my_b_tell(&log_file); + mysql_mutex_unlock(&LOCK_commit_ordered); if (write_file_name_to_index_file) { @@ -3403,6 +3477,13 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) ha_reset_logs(thd); /* + We need to get both locks to be sure that no one is trying to + write to the index log file. + */ + mysql_mutex_lock(&LOCK_log); + mysql_mutex_lock(&LOCK_index); + + /* The following mutex is needed to ensure that no threads call 'delete thd' as we would then risk missing a 'rollback' from this thread. If the transaction involved MyISAM tables, it should go @@ -3410,13 +3491,6 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) */ mysql_mutex_lock(&LOCK_thread_count); - /* - We need to get both locks to be sure that no one is trying to - write to the index log file. - */ - mysql_mutex_lock(&LOCK_log); - mysql_mutex_lock(&LOCK_index); - /* Save variables so that we can reopen the log */ save_name=name; name=0; // Protect against free @@ -4257,8 +4331,15 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) We log the whole file name for log file as the user may decide to change base names at some point. */ - Rotate_log_event r(new_name+dirname_length(new_name), - 0, LOG_EVENT_OFFSET, is_relay_log ? Rotate_log_event::RELAY_LOG : 0); + Rotate_log_event r(new_name+dirname_length(new_name), 0, LOG_EVENT_OFFSET, + is_relay_log ? Rotate_log_event::RELAY_LOG : 0); + /* + The current relay-log's closing Rotate event must have checksum + value computed with an algorithm of the last relay-logged FD event. + */ + if (is_relay_log) + r.checksum_alg= relay_log_checksum_alg; + DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); if(DBUG_EVALUATE_IF("fault_injection_new_file_rotate_event", (error=close_on_error=TRUE), FALSE) || (error= r.write(&log_file))) { @@ -4279,7 +4360,12 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) old_name=name; name=0; // Don't free name close(LOG_CLOSE_TO_BE_OPENED | LOG_CLOSE_INDEX); - + if (log_type == LOG_BIN && checksum_alg_reset != BINLOG_CHECKSUM_ALG_UNDEF) + { + DBUG_ASSERT(!is_relay_log); + DBUG_ASSERT(binlog_checksum_options != checksum_alg_reset); + binlog_checksum_options= checksum_alg_reset; + } /* Note that at this point, log_state != LOG_CLOSED (important for is_open()). */ @@ -4421,6 +4507,10 @@ bool MYSQL_BIN_LOG::flush_and_sync(bool *synced) err= mysql_file_sync(fd, MYF(MY_WME)); if (synced) *synced= 1; +#ifndef DBUG_OFF + if (opt_binlog_dbug_fsync_sleep > 0) + my_sleep(opt_binlog_dbug_fsync_sleep); +#endif } return err; } @@ -4681,12 +4771,34 @@ void THD::binlog_set_stmt_begin() { cache_mngr->trx_cache.set_prev_position(pos); } +static int +binlog_start_consistent_snapshot(handlerton *hton, THD *thd) +{ + int err= 0; + DBUG_ENTER("binlog_start_consistent_snapshot"); + + thd->binlog_setup_trx_data(); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + + /* Server layer calls us with LOCK_commit_ordered locked, so this is safe. */ + strmake(cache_mngr->last_commit_pos_file, mysql_bin_log.last_commit_pos_file, + sizeof(cache_mngr->last_commit_pos_file)-1); + cache_mngr->last_commit_pos_offset= mysql_bin_log.last_commit_pos_offset; + + trans_register_ha(thd, TRUE, hton); + + DBUG_RETURN(err); +} /** This function writes a table map to the binary log. Note that in order to keep the signature uniform with related methods, we use a redundant parameter to indicate whether a transactional table was changed or not. + + If with_annotate != NULL and + *with_annotate = TRUE write also Annotate_rows before the table map. @param table a pointer to the table. @param is_transactional @c true indicates a transactional table, @@ -4694,7 +4806,8 @@ void THD::binlog_set_stmt_begin() { @return nonzero if an error pops up when writing the table map event. */ -int THD::binlog_write_table_map(TABLE *table, bool is_transactional) +int THD::binlog_write_table_map(TABLE *table, bool is_transactional, + my_bool *with_annotate) { int error; DBUG_ENTER("THD::binlog_write_table_map"); @@ -4717,6 +4830,14 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional) IO_CACHE *file= cache_mngr->get_binlog_cache_log(use_trans_cache(this, is_transactional)); + if (with_annotate && *with_annotate) + { + Annotate_rows_log_event anno(current_thd, is_transactional); + /* Annotate event should be written not more than once */ + *with_annotate= 0; + if ((error= anno.write(file))) + DBUG_RETURN(error); + } if ((error= the_event.write(file))) DBUG_RETURN(error); @@ -4871,10 +4992,12 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, } /** - Write an event to the binary log. + Write an event to the binary log. If with_annotate != NULL and + *with_annotate = TRUE write also Annotate_rows before the event + (this should happen only if the event is a Table_map). */ -bool MYSQL_BIN_LOG::write(Log_event *event_info) +bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) { THD *thd= event_info->thd; bool error= 1; @@ -4962,9 +5085,22 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) of the SQL command. If row-based binlogging, Insert_id, Rand and other kind of "setting context" events are not needed. */ + + if (with_annotate && *with_annotate) + { + DBUG_ASSERT(event_info->get_type_code() == TABLE_MAP_EVENT); + Annotate_rows_log_event anno(thd, event_info->cache_type); + /* Annotate event should be written not more than once */ + *with_annotate= 0; + if (anno.write(file)) + goto err; + } + + if (thd) { if (!thd->is_current_stmt_binlog_format_row()) { + if (thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt) { Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT, @@ -5026,6 +5162,8 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) err: if (event_info->use_direct_logging()) { + my_off_t offset= my_b_tell(file); + if (!error) { bool synced; @@ -5034,7 +5172,7 @@ err: goto unlock; status_var_add(thd->status_var.binlog_bytes_written, - my_b_tell(file) - my_org_b_tell); + offset - my_org_b_tell); if ((error= RUN_HOOK(binlog_storage, after_flush, (thd, log_file_name, file->pos_in_file, synced)))) @@ -5045,7 +5183,15 @@ err: signal_update(); rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } + unlock: + /* + Take mutex to protect against a reader seeing partial writes of 64-bit + offset on 32-bit CPUs. + */ + mysql_mutex_lock(&LOCK_commit_ordered); + last_commit_pos_offset= offset; + mysql_mutex_unlock(&LOCK_commit_ordered); mysql_mutex_unlock(&LOCK_log); } @@ -5162,12 +5308,14 @@ int MYSQL_BIN_LOG::rotate_and_purge(uint flags) We give it a shot and try to write an incident event anyway to the current log. */ - if (!write_incident(current_thd, FALSE)) + if (!write_incident_already_locked(current_thd)) flush_and_sync(0); #ifdef HAVE_REPLICATION check_purge= true; #endif + if (flags & RP_BINLOG_CHECKSUM_ALG_CHANGE) + checksum_alg_reset= BINLOG_CHECKSUM_ALG_UNDEF; // done } if (!(flags & RP_LOCK_LOG_IS_ALREADY_LOCKED)) mysql_mutex_unlock(&LOCK_log); @@ -5196,6 +5344,33 @@ uint MYSQL_BIN_LOG::next_file_id() } +/** + Calculate checksum of possibly a part of an event containing at least + the whole common header. + + @param buf the pointer to trans cache's buffer + @param off the offset of the beginning of the event in the buffer + @param event_len no-checksum length of the event + @param length the current size of the buffer + + @param crc [in-out] the checksum + + Event size in incremented by @c BINLOG_CHECKSUM_LEN. + + @return 0 or number of unprocessed yet bytes of the event excluding + the checksum part. +*/ + static ulong fix_log_event_crc(uchar *buf, uint off, uint event_len, + uint length, ha_checksum *crc) +{ + ulong ret; + uchar *event_begin= buf + off; + + ret= length >= off + event_len ? 0 : off + event_len - length; + *crc= my_checksum(*crc, event_begin, event_len - ret); + return ret; +} + /* Write the contents of a cache to the binary log. @@ -5203,24 +5378,33 @@ uint MYSQL_BIN_LOG::next_file_id() write_cache() thd Current_thread cache Cache to write to the binary log - lock_log True if the LOCK_log mutex should be aquired, false otherwise - sync_log True if the log should be flushed and synced DESCRIPTION Write the contents of the cache to the binary log. The cache will be reset as a READ_CACHE to be able to read the contents from it. - */ -int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log, - bool sync_log) -{ - Mutex_sentry sentry(lock_log ? &LOCK_log : NULL); + Reading from the trans cache with possible (per @c binlog_checksum_options) + adding checksum value and then fixing the length and the end_log_pos of + events prior to fill in the binlog cache. +*/ +int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) +{ + mysql_mutex_assert_owner(&LOCK_log); if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) return ER_ERROR_ON_WRITE; uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs; + ulong remains= 0; // part of unprocessed yet netto length of the event long val; + ulong end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t uchar header[LOG_EVENT_HEADER_LEN]; + ha_checksum crc= 0, crc_0= 0; // assignments to keep compiler happy + my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF); + uchar buf[BINLOG_CHECKSUM_LEN]; + + // while there is just one alg the following must hold: + DBUG_ASSERT(!do_checksum || + binlog_checksum_options == BINLOG_CHECKSUM_ALG_CRC32); /* The events in the buffer have incorrect end_log_pos data @@ -5238,6 +5422,8 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log, group= (uint)my_b_tell(&log_file); hdr_offs= carry= 0; + if (do_checksum) + crc= crc_0= my_checksum(0L, NULL, 0); do { @@ -5250,12 +5436,21 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log, DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN); /* assemble both halves */ - memcpy(&header[carry], (char *)cache->read_pos, LOG_EVENT_HEADER_LEN - carry); + memcpy(&header[carry], (char *)cache->read_pos, + LOG_EVENT_HEADER_LEN - carry); /* fix end_log_pos */ - val= uint4korr(&header[LOG_POS_OFFSET]) + group; + val= uint4korr(&header[LOG_POS_OFFSET]) + group + + (end_log_pos_inc+= (do_checksum ? BINLOG_CHECKSUM_LEN : 0)); int4store(&header[LOG_POS_OFFSET], val); + if (do_checksum) + { + ulong len= uint4korr(&header[EVENT_LEN_OFFSET]); + /* fix len */ + int4store(&header[EVENT_LEN_OFFSET], len + BINLOG_CHECKSUM_LEN); + } + /* write the first half of the split header */ if (my_b_write(&log_file, header, carry)) return ER_ERROR_ON_WRITE; @@ -5265,11 +5460,20 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log, copy fixed second half of header to cache so the correct version will be written later. */ - memcpy((char *)cache->read_pos, &header[carry], LOG_EVENT_HEADER_LEN - carry); + memcpy((char *)cache->read_pos, &header[carry], + LOG_EVENT_HEADER_LEN - carry); /* next event header at ... */ - hdr_offs = uint4korr(&header[EVENT_LEN_OFFSET]) - carry; + hdr_offs= uint4korr(&header[EVENT_LEN_OFFSET]) - carry - + (do_checksum ? BINLOG_CHECKSUM_LEN : 0); + if (do_checksum) + { + DBUG_ASSERT(crc == crc_0 && remains == 0); + crc= my_checksum(crc, header, carry); + remains= uint4korr(header + EVENT_LEN_OFFSET) - carry - + BINLOG_CHECKSUM_LEN; + } carry= 0; } @@ -5284,6 +5488,25 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log, very next iteration, just "eventually"). */ + /* crc-calc the whole buffer */ + if (do_checksum && hdr_offs >= length) + { + + DBUG_ASSERT(remains != 0 && crc != crc_0); + + crc= my_checksum(crc, cache->read_pos, length); + remains -= length; + if (my_b_write(&log_file, cache->read_pos, length)) + return ER_ERROR_ON_WRITE; + if (remains == 0) + { + int4store(buf, crc); + if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN)) + return ER_ERROR_ON_WRITE; + crc= crc_0; + } + } + while (hdr_offs < length) { /* @@ -5291,6 +5514,26 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log, we get the rest. */ + if (do_checksum) + { + if (remains != 0) + { + /* + finish off with remains of the last event that crawls + from previous into the current buffer + */ + DBUG_ASSERT(crc != crc_0); + crc= my_checksum(crc, cache->read_pos, hdr_offs); + int4store(buf, crc); + remains -= hdr_offs; + DBUG_ASSERT(remains == 0); + if (my_b_write(&log_file, cache->read_pos, hdr_offs) || + my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN)) + return ER_ERROR_ON_WRITE; + crc= crc_0; + } + } + if (hdr_offs + LOG_EVENT_HEADER_LEN > length) { carry= length - hdr_offs; @@ -5300,17 +5543,38 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log, else { /* we've got a full event-header, and it came in one piece */ - - uchar *log_pos= (uchar *)cache->read_pos + hdr_offs + LOG_POS_OFFSET; + uchar *ev= (uchar *)cache->read_pos + hdr_offs; + uint event_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len + uchar *log_pos= ev + LOG_POS_OFFSET; /* fix end_log_pos */ - val= uint4korr(log_pos) + group; + val= uint4korr(log_pos) + group + + (end_log_pos_inc += (do_checksum ? BINLOG_CHECKSUM_LEN : 0)); int4store(log_pos, val); + /* fix CRC */ + if (do_checksum) + { + /* fix length */ + int4store(ev + EVENT_LEN_OFFSET, event_len + BINLOG_CHECKSUM_LEN); + remains= fix_log_event_crc(cache->read_pos, hdr_offs, event_len, + length, &crc); + if (my_b_write(&log_file, ev, + remains == 0 ? event_len : length - hdr_offs)) + return ER_ERROR_ON_WRITE; + if (remains == 0) + { + int4store(buf, crc); + if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN)) + return ER_ERROR_ON_WRITE; + crc= crc_0; // crc is complete + } + } + /* next event header at ... */ - log_pos= (uchar *)cache->read_pos + hdr_offs + EVENT_LEN_OFFSET; - hdr_offs += uint4korr(log_pos); + hdr_offs += event_len; // incr by the netto len + DBUG_ASSERT(!do_checksum || remains == 0 || hdr_offs >= length); } } @@ -5326,17 +5590,19 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log, } /* Write data to the binary log file */ - if (my_b_write(&log_file, cache->read_pos, length)) - return ER_ERROR_ON_WRITE; + DBUG_EXECUTE_IF("fail_binlog_write_1", + errno= 28; return ER_ERROR_ON_WRITE;); + if (!do_checksum) + if (my_b_write(&log_file, cache->read_pos, length)) + return ER_ERROR_ON_WRITE; status_var_add(thd->status_var.binlog_bytes_written, length); cache->read_pos=cache->read_end; // Mark buffer used up } while ((length= my_b_fill(cache))); DBUG_ASSERT(carry == 0); - - if (sync_log) - return flush_and_sync(0); + DBUG_ASSERT(!do_checksum || remains == 0); + DBUG_ASSERT(!do_checksum || crc == crc_0); return 0; // All OK } @@ -5370,31 +5636,50 @@ int query_error_code(THD *thd, bool not_killed) return error; } -bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock) + +bool MYSQL_BIN_LOG::write_incident_already_locked(THD *thd) { uint error= 0; - DBUG_ENTER("MYSQL_BIN_LOG::write_incident"); - - if (!is_open()) - DBUG_RETURN(error); - - LEX_STRING const write_error_msg= - { C_STRING_WITH_LEN("error writing to the binary log") }; + DBUG_ENTER("MYSQL_BIN_LOG::write_incident_already_locked"); Incident incident= INCIDENT_LOST_EVENTS; Incident_log_event ev(thd, incident, write_error_msg); - if (lock) - mysql_mutex_lock(&LOCK_log); - error= ev.write(&log_file); - status_var_add(thd->status_var.binlog_bytes_written, ev.data_written); - if (lock) + + if (likely(is_open())) + { + error= ev.write(&log_file); + status_var_add(thd->status_var.binlog_bytes_written, ev.data_written); + } + + DBUG_RETURN(error); +} + + +bool MYSQL_BIN_LOG::write_incident(THD *thd) +{ + uint error= 0; + my_off_t offset; + DBUG_ENTER("MYSQL_BIN_LOG::write_incident"); + + mysql_mutex_lock(&LOCK_log); + if (likely(is_open())) { - if (!error && !(error= flush_and_sync(0))) + if (!(error= write_incident_already_locked(thd)) && + !(error= flush_and_sync(0))) { signal_update(); error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } - mysql_mutex_unlock(&LOCK_log); + offset= my_b_tell(&log_file); + /* + Take mutex to protect against a reader seeing partial writes of 64-bit + offset on 32-bit CPUs. + */ + mysql_mutex_lock(&LOCK_commit_ordered); + last_commit_pos_offset= offset; + mysql_mutex_unlock(&LOCK_commit_ordered); } + mysql_mutex_unlock(&LOCK_log); + DBUG_RETURN(error); } @@ -5422,111 +5707,425 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock) 'cache' needs to be reinitialized after this functions returns. */ -bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event, - bool incident) +bool +MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, + binlog_cache_mngr *cache_mngr, + Log_event *end_ev, bool all, + bool using_stmt_cache, + bool using_trx_cache) +{ + group_commit_entry entry; + DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog"); + + entry.thd= thd; + entry.cache_mngr= cache_mngr; + entry.error= 0; + entry.all= all; + entry.using_stmt_cache= using_stmt_cache; + entry.using_trx_cache= using_trx_cache; + + /* + Log "BEGIN" at the beginning of every transaction. Here, a transaction is + either a BEGIN..COMMIT block or a single statement in autocommit mode. + + Create the necessary events here, where we have the correct THD (and + thread context). + + Due to group commit the actual writing to binlog may happen in a different + thread. + */ + Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), using_trx_cache, TRUE, + TRUE, 0); + entry.begin_event= &qinfo; + entry.end_event= end_ev; + if (cache_mngr->stmt_cache.has_incident() || + cache_mngr->trx_cache.has_incident()) + { + Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg); + entry.incident_event= &inc_ev; + DBUG_RETURN(write_transaction_to_binlog_events(&entry)); + } + else + { + entry.incident_event= NULL; + DBUG_RETURN(write_transaction_to_binlog_events(&entry)); + } +} + +bool +MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) { - DBUG_ENTER("MYSQL_BIN_LOG::write(THD *, IO_CACHE *, Log_event *)"); + /* + To facilitate group commit for the binlog, we first queue up ourselves in + the group commit queue. Then the first thread to enter the queue waits for + the LOCK_log mutex, and commits for everyone in the queue once it gets the + lock. Any other threads in the queue just wait for the first one to finish + the commit and wake them up. + */ + + entry->thd->clear_wakeup_ready(); + mysql_mutex_lock(&LOCK_prepare_ordered); + group_commit_entry *orig_queue= group_commit_queue; + entry->next= orig_queue; + group_commit_queue= entry; + + if (entry->cache_mngr->using_xa) + { + DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered"); + run_prepare_ordered(entry->thd, entry->all); + DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered"); + } + mysql_mutex_unlock(&LOCK_prepare_ordered); + DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered"); + + /* + The first in the queue handle group commit for all; the others just wait + to be signalled when group commit is done. + */ + if (orig_queue != NULL) + entry->thd->wait_for_wakeup_ready(); + else + trx_group_commit_leader(entry); + + if (!opt_optimize_thread_scheduling) + { + /* For the leader, trx_group_commit_leader() already took the lock. */ + if (orig_queue != NULL) + mysql_mutex_lock(&LOCK_commit_ordered); + + DEBUG_SYNC(entry->thd, "commit_loop_entry_commit_ordered"); + ++num_commits; + if (entry->cache_mngr->using_xa && !entry->error) + run_commit_ordered(entry->thd, entry->all); + + group_commit_entry *next= entry->next; + if (!next) + { + group_commit_queue_busy= FALSE; + mysql_cond_signal(&COND_queue_busy); + DEBUG_SYNC(entry->thd, "commit_after_group_run_commit_ordered"); + } + mysql_mutex_unlock(&LOCK_commit_ordered); + + if (next) + { + next->thd->signal_wakeup_ready(); + } + } + + if (likely(!entry->error)) + return 0; + + switch (entry->error) + { + case ER_ERROR_ON_WRITE: + my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, entry->commit_errno); + break; + case ER_ERROR_ON_READ: + my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH), + entry->error_cache->file_name, entry->commit_errno); + break; + default: + /* + There are not (and should not be) any errors thrown not covered above. + But just in case one is added later without updating the above switch + statement, include a catch-all. + */ + my_printf_error(entry->error, + "Error writing transaction to binary log: %d", + MYF(ME_NOREFRESH), entry->error); + } + + /* + Since we return error, this transaction XID will not be committed, so + we need to mark it as not needed for recovery (unlog() is not called + for a transaction if log_xid() fails). + */ + if (entry->cache_mngr->using_xa && entry->cache_mngr->xa_xid) + mark_xid_done(); + + return 1; +} + +/* + Do binlog group commit as the lead thread. + + This must be called when this statement/transaction is queued at the start of + the group_commit_queue. It will wait to obtain the LOCK_log mutex, then group + commit all the transactions in the queue (more may have entered while waiting + for LOCK_log). After commit is done, all other threads in the queue will be + signalled. + + */ +void +MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) +{ + uint xid_count= 0; + my_off_t commit_offset; + group_commit_entry *current; + group_commit_entry *last_in_queue; + DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader"); + LINT_INIT(commit_offset); + + /* + Lock the LOCK_log(), and once we get it, collect any additional writes + that queued up while we were waiting. + */ mysql_mutex_lock(&LOCK_log); + DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log"); + + mysql_mutex_lock(&LOCK_prepare_ordered); + current= group_commit_queue; + group_commit_queue= NULL; + mysql_mutex_unlock(&LOCK_prepare_ordered); + /* As the queue is in reverse order of entering, reverse it. */ + group_commit_entry *queue= NULL; + last_in_queue= current; + while (current) + { + group_commit_entry *next= current->next; + current->next= queue; + queue= current; + current= next; + } + DBUG_ASSERT(leader == queue /* the leader should be first in queue */); + + /* Now we have in queue the list of transactions to be committed in order. */ DBUG_ASSERT(is_open()); if (likely(is_open())) // Should always be true { /* - We only bother to write to the binary log if there is anything - to write. - */ - if (my_b_tell(cache) > 0) + Commit every transaction in the queue. + + Note that we are doing this in a different thread than the one running + the transaction! So we are limited in the operations we can do. In + particular, we cannot call my_error() on behalf of a transaction, as + that obtains the THD from thread local storage. Instead, we must set + current->error and let the thread do the error reporting itself once + we wake it up. + */ + for (current= queue; current != NULL; current= current->next) { + binlog_cache_mngr *cache_mngr= current->cache_mngr; + /* - Log "BEGIN" at the beginning of every transaction. Here, a - transaction is either a BEGIN..COMMIT block or a single - statement in autocommit mode. + We already checked before that at least one cache is non-empty; if both + are empty we would have skipped calling into here. */ - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE, TRUE, 0); - if (qinfo.write(&log_file)) - goto err; + DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty()); - status_var_add(thd->status_var.binlog_bytes_written, qinfo.data_written); + current->error= write_transaction_or_stmt(current); - DBUG_EXECUTE_IF("crash_before_writing_xid", - { - if ((write_error= write_cache(thd, cache, FALSE, - TRUE))) - DBUG_PRINT("info", ("error writing binlog cache: %d", - write_error)); - DBUG_PRINT("info", ("crashing before writing xid")); - DBUG_SUICIDE(); - }); - - if ((write_error= write_cache(thd, cache, FALSE, FALSE))) - goto err; + strmake(cache_mngr->last_commit_pos_file, log_file_name, + sizeof(cache_mngr->last_commit_pos_file)-1); + commit_offset= my_b_write_tell(&log_file); + cache_mngr->last_commit_pos_offset= commit_offset; + if (cache_mngr->using_xa && cache_mngr->xa_xid) + xid_count++; + } - if (commit_event) + bool synced= 0; + if (flush_and_sync(&synced)) + { + for (current= queue; current != NULL; current= current->next) { - if (commit_event->write(&log_file)) - goto err; - status_var_add(thd->status_var.binlog_bytes_written, - commit_event->data_written); + if (!current->error) + { + current->error= ER_ERROR_ON_WRITE; + current->commit_errno= errno; + current->error_cache= NULL; + } } - - if (incident && write_incident(thd, FALSE)) - goto err; - - bool synced= 0; - if (flush_and_sync(&synced)) - goto err; - DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_SUICIDE();); - if (cache->error) // Error on read + } + else + { + bool any_error= false; + bool all_error= true; + for (current= queue; current != NULL; current= current->next) { - sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno); - write_error=1; // Don't give more errors - goto err; + if (!current->error && + RUN_HOOK(binlog_storage, after_flush, + (current->thd, log_file_name, log_file.pos_in_file, synced))) + { + current->error= ER_ERROR_ON_WRITE; + current->commit_errno= -1; + current->error_cache= NULL; + any_error= true; + } + else + all_error= false; } - if (RUN_HOOK(binlog_storage, after_flush, - (thd, log_file_name, log_file.pos_in_file, synced))) - { + if (any_error) sql_print_error("Failed to run 'after_flush' hooks"); - write_error=1; - goto err; - } - - signal_update(); + if (!all_error) + signal_update(); } /* - if commit_event is Xid_log_event, increase the number of - prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated + if any commit_events are Xid_log_event, increase the number of + prepared_xids (it's decreased in ::unlog()). Binlog cannot be rotated if there're prepared xids in it - see the comment in new_file() for an explanation. - If the commit_event is not Xid_log_event (then it's a Query_log_event) - rotate binlog, if necessary. + If no Xid_log_events (then it's all Query_log_event) rotate binlog, + if necessary. */ - if (commit_event && commit_event->get_type_code() == XID_EVENT) + if (xid_count > 0) { - mysql_mutex_lock(&LOCK_prep_xids); - prepared_xids++; - mysql_mutex_unlock(&LOCK_prep_xids); + mark_xids_active(xid_count); } else + { if (rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED)) - goto err; + { + /* + If we fail to rotate, which thread should get the error? + We give the error to the *last* transaction thread; that seems to + make the most sense, as it was the last to write to the log. + */ + last_in_queue->error= ER_ERROR_ON_WRITE; + last_in_queue->commit_errno= errno; + } + } } + + DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered"); + mysql_mutex_lock(&LOCK_commit_ordered); + last_commit_pos_offset= commit_offset; + /* + We cannot unlock LOCK_log until we have locked LOCK_commit_ordered; + otherwise scheduling could allow the next group commit to run ahead of us, + messing up the order of commit_ordered() calls. But as soon as + LOCK_commit_ordered is obtained, we can let the next group commit start. + */ mysql_mutex_unlock(&LOCK_log); + DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log"); + ++num_group_commits; - DBUG_RETURN(0); + if (!opt_optimize_thread_scheduling) + { + /* + If we want to run commit_ordered() each in the transaction's own thread + context, then we need to mark the queue reserved; we need to finish all + threads in one group commit before the next group commit can be allowed + to proceed, and we cannot unlock a simple pthreads mutex in a different + thread from the one that locked it. + */ -err: - if (!write_error) + while (group_commit_queue_busy) + mysql_cond_wait(&COND_queue_busy, &LOCK_commit_ordered); + group_commit_queue_busy= TRUE; + + /* Note that we return with LOCK_commit_ordered locked! */ + DBUG_VOID_RETURN; + } + + /* + Wakeup each participant waiting for our group commit, first calling the + commit_ordered() methods for any transactions doing 2-phase commit. + */ + current= queue; + while (current != NULL) { - write_error= 1; - sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno); + group_commit_entry *next; + + DEBUG_SYNC(leader->thd, "commit_loop_entry_commit_ordered"); + ++num_commits; + if (current->cache_mngr->using_xa && !current->error) + run_commit_ordered(current->thd, current->all); + + /* + Careful not to access current->next after waking up the other thread! As + it may change immediately after wakeup. + */ + next= current->next; + if (current != leader) // Don't wake up ourself + current->thd->signal_wakeup_ready(); + current= next; } - mysql_mutex_unlock(&LOCK_log); - DBUG_RETURN(1); + DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered"); + mysql_mutex_unlock(&LOCK_commit_ordered); + + DBUG_VOID_RETURN; } +int +MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry) +{ + binlog_cache_mngr *mngr= entry->cache_mngr; + + if (entry->begin_event->write(&log_file)) + return ER_ERROR_ON_WRITE; + status_var_add(entry->thd->status_var.binlog_bytes_written, + entry->begin_event->data_written); + + if (entry->using_stmt_cache && !mngr->stmt_cache.empty() && + write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE))) + { + entry->error_cache= &mngr->stmt_cache.cache_log; + entry->commit_errno= errno; + return ER_ERROR_ON_WRITE; + } + + if (entry->using_trx_cache && !mngr->trx_cache.empty()) + { + DBUG_EXECUTE_IF("crash_before_writing_xid", + { + if ((write_cache(entry->thd, + mngr->get_binlog_cache_log(TRUE)))) + DBUG_PRINT("info", ("error writing binlog cache")); + else + flush_and_sync(0); + + DBUG_PRINT("info", ("crashing before writing xid")); + DBUG_SUICIDE(); + }); + + if (write_cache(entry->thd, mngr->get_binlog_cache_log(TRUE))) + { + entry->error_cache= &mngr->trx_cache.cache_log; + entry->commit_errno= errno; + return ER_ERROR_ON_WRITE; + } + } + + if (entry->end_event->write(&log_file)) + { + entry->error_cache= NULL; + entry->commit_errno= errno; + return ER_ERROR_ON_WRITE; + } + status_var_add(entry->thd->status_var.binlog_bytes_written, + entry->end_event->data_written); + + if (entry->incident_event) + { + if (entry->incident_event->write(&log_file)) + { + entry->error_cache= NULL; + entry->commit_errno= errno; + return ER_ERROR_ON_WRITE; + } + } + + if (mngr->get_binlog_cache_log(FALSE)->error) // Error on read + { + entry->error_cache= &mngr->stmt_cache.cache_log; + entry->commit_errno= errno; + return ER_ERROR_ON_READ; + } + if (mngr->get_binlog_cache_log(TRUE)->error) // Error on read + { + entry->error_cache= &mngr->trx_cache.cache_log; + entry->commit_errno= errno; + return ER_ERROR_ON_READ; + } + + return 0; +} + /** Wait until we get a signal that the relay log has been updated. @@ -5608,6 +6207,11 @@ void MYSQL_BIN_LOG::close(uint exiting) (exiting & LOG_CLOSE_STOP_EVENT)) { Stop_log_event s; + // the checksumming rule for relay-log case is similar to Rotate + s.checksum_alg= is_relay_log ? + (uint8) relay_log_checksum_alg : (uint8) binlog_checksum_options; + DBUG_ASSERT(!is_relay_log || + relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); s.write(&log_file); bytes_written+= s.data_written; signal_update(); @@ -5724,9 +6328,23 @@ static bool test_if_number(register const char *str, void sql_perror(const char *message) { -#ifdef HAVE_STRERROR +#if defined(_WIN32) + char* buf; + DWORD dw= GetLastError(); + if (FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, NULL, dw, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPSTR)&buf, 0, NULL ) > 0) + { + sql_print_error("%s: %s",message, buf); + LocalFree((HLOCAL)buf); + } + else + { + sql_print_error("%s", message); + } +#elif defined(HAVE_STRERROR) sql_print_error("%s: %s",message, strerror(errno)); -#else +#else perror(message); #endif } @@ -5931,6 +6549,148 @@ void sql_print_information(const char *format, ...) } +void +TC_LOG::run_prepare_ordered(THD *thd, bool all) +{ + Ha_trx_info *ha_info= + all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; + + mysql_mutex_assert_owner(&LOCK_prepare_ordered); + for (; ha_info; ha_info= ha_info->next()) + { + handlerton *ht= ha_info->ht(); + if (!ht->prepare_ordered) + continue; + ht->prepare_ordered(ht, thd, all); + } +} + + +void +TC_LOG::run_commit_ordered(THD *thd, bool all) +{ + Ha_trx_info *ha_info= + all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; + + mysql_mutex_assert_owner(&LOCK_commit_ordered); + for (; ha_info; ha_info= ha_info->next()) + { + handlerton *ht= ha_info->ht(); + if (!ht->commit_ordered) + continue; + ht->commit_ordered(ht, thd, all); + DEBUG_SYNC(thd, "commit_after_run_commit_ordered"); + } +} + + +int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered, + bool need_commit_ordered) +{ + int cookie; + struct commit_entry entry; + bool is_group_commit_leader; + LINT_INIT(is_group_commit_leader); + + if (need_prepare_ordered) + { + mysql_mutex_lock(&LOCK_prepare_ordered); + run_prepare_ordered(thd, all); + if (need_commit_ordered) + { + /* + Must put us in queue so we can run_commit_ordered() in same sequence + as we did run_prepare_ordered(). + */ + thd->clear_wakeup_ready(); + entry.thd= thd; + commit_entry *previous_queue= commit_ordered_queue; + entry.next= previous_queue; + commit_ordered_queue= &entry; + is_group_commit_leader= (previous_queue == NULL); + } + mysql_mutex_unlock(&LOCK_prepare_ordered); + } + + cookie= 0; + if (xid) + cookie= log_one_transaction(xid); + + if (need_commit_ordered) + { + if (need_prepare_ordered) + { + /* + We did the run_prepare_ordered() serialised, then ran the log_xid() in + parallel. Now we have to do run_commit_ordered() serialised in the + same sequence as run_prepare_ordered(). + + We do this starting from the head of the queue, each thread doing + run_commit_ordered() and signalling the next in queue. + */ + if (is_group_commit_leader) + { + /* The first in queue starts the ball rolling. */ + mysql_mutex_lock(&LOCK_prepare_ordered); + while (commit_ordered_queue_busy) + mysql_cond_wait(&COND_queue_busy, &LOCK_prepare_ordered); + commit_entry *queue= commit_ordered_queue; + commit_ordered_queue= NULL; + /* + Mark the queue busy while we bounce it from one thread to the + next. + */ + commit_ordered_queue_busy= true; + mysql_mutex_unlock(&LOCK_prepare_ordered); + + /* Reverse the queue list so we get correct order. */ + commit_entry *prev= NULL; + while (queue) + { + commit_entry *next= queue->next; + queue->next= prev; + prev= queue; + queue= next; + } + DBUG_ASSERT(prev == &entry && prev->thd == thd); + } + else + { + /* Not first in queue; just wait until previous thread wakes us up. */ + thd->wait_for_wakeup_ready(); + } + } + + /* Only run commit_ordered() if log_xid was successful. */ + if (cookie) + { + mysql_mutex_lock(&LOCK_commit_ordered); + run_commit_ordered(thd, all); + mysql_mutex_unlock(&LOCK_commit_ordered); + } + + if (need_prepare_ordered) + { + commit_entry *next= entry.next; + if (next) + { + next->thd->signal_wakeup_ready(); + } + else + { + mysql_mutex_lock(&LOCK_prepare_ordered); + commit_ordered_queue_busy= false; + mysql_cond_signal(&COND_queue_busy); + mysql_mutex_unlock(&LOCK_prepare_ordered); + } + } + } + + return cookie; +} + + /********* transaction coordinator log for 2pc - mmap() based solution *******/ /* @@ -6068,6 +6828,7 @@ int TC_LOG_MMAP::open(const char *opt_name) mysql_mutex_init(key_LOCK_pool, &LOCK_pool, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_active, &COND_active, 0); mysql_cond_init(key_COND_pool, &COND_pool, 0); + mysql_cond_init(key_COND_queue_busy, &COND_queue_busy, 0); inited=6; @@ -6075,6 +6836,8 @@ int TC_LOG_MMAP::open(const char *opt_name) active=pages; pool=pages+1; pool_last=pages+npages-1; + commit_ordered_queue= NULL; + commit_ordered_queue_busy= false; return 0; @@ -6180,7 +6943,7 @@ int TC_LOG_MMAP::overflow() to the position in memory where xid was logged to. */ -int TC_LOG_MMAP::log_xid(THD *thd, my_xid xid) +int TC_LOG_MMAP::log_one_transaction(my_xid xid) { int err; PAGE *p; @@ -6350,6 +7113,8 @@ void TC_LOG_MMAP::close() mysql_mutex_destroy(&LOCK_active); mysql_mutex_destroy(&LOCK_pool); mysql_cond_destroy(&COND_pool); + mysql_cond_destroy(&COND_active); + mysql_cond_destroy(&COND_queue_busy); case 5: data[0]='A'; // garble the first (signature) byte, in case mysql_file_delete fails case 4: @@ -6529,7 +7294,8 @@ int TC_LOG_BINLOG::open(const char *opt_name) goto err; } - if ((ev= Log_event::read_log_event(&log, 0, &fdle)) && + if ((ev= Log_event::read_log_event(&log, 0, &fdle, + opt_master_verify_checksum)) && ev->get_type_code() == FORMAT_DESCRIPTION_EVENT && ev->flags & LOG_EVENT_BINLOG_IN_USE_F) { @@ -6559,42 +7325,86 @@ void TC_LOG_BINLOG::close() mysql_cond_destroy(&COND_prep_xids); } -/** - @todo - group commit - - @retval - 0 error - @retval - 1 success +/* + Do a binlog log_xid() for a group of transactions, linked through + thd->next_commit_ordered. */ -int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid) +int +TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all, + bool need_prepare_ordered __attribute__((unused)), + bool need_commit_ordered __attribute__((unused))) { - DBUG_ENTER("TC_LOG_BINLOG::log"); + int err; + DBUG_ENTER("TC_LOG_BINLOG::log_and_order"); + binlog_cache_mngr *cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - /* - We always commit the entire transaction when writing an XID. Also - note that the return value is inverted. - */ - DBUG_RETURN(!binlog_commit_flush_stmt_cache(thd, cache_mngr) && - !binlog_commit_flush_trx_cache(thd, cache_mngr, xid)); + + cache_mngr->using_xa= TRUE; + cache_mngr->xa_xid= xid; + err= binlog_commit_flush_xid_caches(thd, cache_mngr, all, xid); + + DEBUG_SYNC(thd, "binlog_after_log_and_order"); + + DBUG_RETURN(!err); } -int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) +/* + After an XID is logged, we need to hold on to the current binlog file until + it is fully committed in the storage engine. The reason is that crash + recovery only looks at the latest binlog, so we must make sure there are no + outstanding prepared (but not committed) transactions before rotating the + binlog. + + To handle this, we keep a count of outstanding XIDs. This function is used + to increase this count when committing one or more transactions to the + binary log. +*/ +void +TC_LOG_BINLOG::mark_xids_active(uint xid_count) { - DBUG_ENTER("TC_LOG_BINLOG::unlog"); + DBUG_ENTER("TC_LOG_BINLOG::mark_xids_active"); + DBUG_PRINT("info", ("xid_count=%u", xid_count)); + mysql_mutex_lock(&LOCK_prep_xids); + prepared_xids+= xid_count; + mysql_mutex_unlock(&LOCK_prep_xids); + DBUG_VOID_RETURN; +} + +/* + Once an XID is committed, it is safe to rotate the binary log, as it can no + longer be needed during crash recovery. + + This function is called to mark an XID this way. It needs to decrease the + count of pending XIDs, and signal the log rotator thread when it reaches zero. +*/ +void +TC_LOG_BINLOG::mark_xid_done() +{ + my_bool send_signal; + + DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done"); mysql_mutex_lock(&LOCK_prep_xids); // prepared_xids can be 0 if the transaction had ignorable errors. DBUG_ASSERT(prepared_xids >= 0); if (prepared_xids > 0) prepared_xids--; - if (prepared_xids == 0) { + send_signal= (prepared_xids == 0); + mysql_mutex_unlock(&LOCK_prep_xids); + if (send_signal) { DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids)); mysql_cond_signal(&COND_prep_xids); } - mysql_mutex_unlock(&LOCK_prep_xids); - DBUG_RETURN(rotate_and_purge(0)); // as ::write() did not rotate + DBUG_VOID_RETURN; +} + +int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) +{ + DBUG_ENTER("TC_LOG_BINLOG::unlog"); + if (xid) + mark_xid_done(); + /* As ::write_transaction_to_binlog() did not rotate, do it here. */ + DBUG_RETURN(rotate_and_purge(0)); } int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle) @@ -6612,7 +7422,9 @@ int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle) fdle->flags&= ~LOG_EVENT_BINLOG_IN_USE_F; // abort on the first error - while ((ev= Log_event::read_log_event(log,0,fdle)) && ev->is_valid()) + while ((ev= Log_event::read_log_event(log, 0, fdle, + opt_master_verify_checksum)) + && ev->is_valid()) { if (ev->get_type_code() == XID_EVENT) { @@ -6663,9 +7475,170 @@ ulonglong mysql_bin_log_file_pos(void) { return (ulonglong) mysql_bin_log.get_log_file()->pos_in_file; } +/* + Get the current position of the MySQL binlog for transaction currently being + committed. + + This is valid to call from within storage engine commit_ordered() and + commit() methods only. + + Since it stores the position inside THD, it is safe to call without any + locking. +*/ +void +mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file) +{ + binlog_cache_mngr *cache_mngr; + if (opt_bin_log && + (cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton))) + { + *out_file= cache_mngr->last_commit_pos_file; + *out_pos= (ulonglong)(cache_mngr->last_commit_pos_offset); + } + else + { + *out_file= NULL; + *out_pos= 0; + } +} #endif /* INNODB_COMPATIBILITY_HOOKS */ +static void +binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var, + void *var_ptr, const void *save) +{ + ulong value= *((ulong *)save); + + mysql_mutex_lock(mysql_bin_log.get_log_lock()); + if(mysql_bin_log.is_open()) + { + uint flags= RP_FORCE_ROTATE | RP_LOCK_LOG_IS_ALREADY_LOCKED | + (binlog_checksum_options != (uint) value? + RP_BINLOG_CHECKSUM_ALG_CHANGE : 0); + if (flags & RP_BINLOG_CHECKSUM_ALG_CHANGE) + mysql_bin_log.checksum_alg_reset= (uint8) value; + mysql_bin_log.rotate_and_purge(flags); + } + else + { + binlog_checksum_options= value; + } + DBUG_ASSERT((ulong) binlog_checksum_options == value); + DBUG_ASSERT(mysql_bin_log.checksum_alg_reset == BINLOG_CHECKSUM_ALG_UNDEF); + mysql_mutex_unlock(mysql_bin_log.get_log_lock()); +} + + +static int show_binlog_vars(THD *thd, SHOW_VAR *var, char *buff) +{ + mysql_bin_log.set_status_variables(thd); + var->type= SHOW_ARRAY; + var->value= (char *)&binlog_status_vars_detail; + return 0; +} + +static SHOW_VAR binlog_status_vars_top[]= { + {"binlog", (char *) &show_binlog_vars, SHOW_FUNC}, + {NullS, NullS, SHOW_LONG} +}; + +static MYSQL_SYSVAR_BOOL( + optimize_thread_scheduling, + opt_optimize_thread_scheduling, + PLUGIN_VAR_READONLY, + "Run fast part of group commit in a single thread, to optimize kernel " + "thread scheduling. On by default. Disable to run each transaction in group " + "commit in its own thread, which can be slower at very high concurrency. " + "This option is mostly for testing one algorithm versus the other, and it " + "should not normally be necessary to change it.", + NULL, + NULL, + 1); + +static MYSQL_SYSVAR_ENUM( + checksum, + binlog_checksum_options, + PLUGIN_VAR_RQCMDARG, + "Type of BINLOG_CHECKSUM_ALG. Include checksum for " + "log events in the binary log. Possible values are NONE and CRC32; " + "default is NONE.", + NULL, + binlog_checksum_update, + BINLOG_CHECKSUM_ALG_OFF, + &binlog_checksum_typelib); + +#ifndef DBUG_OFF +static MYSQL_SYSVAR_ULONG( + dbug_fsync_sleep, + opt_binlog_dbug_fsync_sleep, + PLUGIN_VAR_RQCMDARG, + "Extra sleep (in microseconds) to add to binlog fsync(), for debugging", + NULL, + NULL, + 0, + 0, + ULONG_MAX, + 0); +#endif + +static struct st_mysql_sys_var *binlog_sys_vars[]= +{ + MYSQL_SYSVAR(optimize_thread_scheduling), + MYSQL_SYSVAR(checksum), +#ifndef DBUG_OFF + MYSQL_SYSVAR(dbug_fsync_sleep), +#endif + NULL +}; + + +/* + Copy out the non-directory part of binlog position filename for the + `binlog_snapshot_file' status variable, same way as it is done for + SHOW MASTER STATUS. +*/ +static void +set_binlog_snapshot_file(const char *src) +{ + int dir_len = dirname_length(src); + strmake(binlog_snapshot_file, src + dir_len, sizeof(binlog_snapshot_file)-1); +} + +/* + Copy out current values of status variables, for SHOW STATUS or + information_schema.global_status. + + This is called only under LOCK_status, so we can fill in a static array. +*/ +void +TC_LOG_BINLOG::set_status_variables(THD *thd) +{ + binlog_cache_mngr *cache_mngr; + + if (thd && opt_bin_log) + cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + else + cache_mngr= 0; + + bool have_snapshot= (cache_mngr && cache_mngr->last_commit_pos_file[0] != 0); + mysql_mutex_lock(&LOCK_commit_ordered); + binlog_status_var_num_commits= this->num_commits; + binlog_status_var_num_group_commits= this->num_group_commits; + if (!have_snapshot) + { + set_binlog_snapshot_file(last_commit_pos_file); + binlog_snapshot_position= last_commit_pos_offset; + } + mysql_mutex_unlock(&LOCK_commit_ordered); + + if (have_snapshot) + { + set_binlog_snapshot_file(cache_mngr->last_commit_pos_file); + binlog_snapshot_position= cache_mngr->last_commit_pos_offset; + } +} + struct st_mysql_storage_engine binlog_storage_engine= { MYSQL_HANDLERTON_INTERFACE_VERSION }; @@ -6680,8 +7653,8 @@ mysql_declare_plugin(binlog) binlog_init, /* Plugin Init */ NULL, /* Plugin Deinit */ 0x0100 /* 1.0 */, - NULL, /* status variables */ - NULL, /* system variables */ + binlog_status_vars_top, /* status variables */ + binlog_sys_vars, /* system variables */ NULL /* config options */ } mysql_declare_plugin_end; @@ -6696,8 +7669,8 @@ maria_declare_plugin(binlog) binlog_init, /* Plugin Init */ NULL, /* Plugin Deinit */ 0x0100 /* 1.0 */, - NULL, /* status variables */ - NULL, /* system variables */ + binlog_status_vars_top, /* status variables */ + binlog_sys_vars, /* system variables */ "1.0", /* string version */ MariaDB_PLUGIN_MATURITY_STABLE /* maturity */ } |