summaryrefslogtreecommitdiff
path: root/sql/log.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log.cc')
-rw-r--r--sql/log.cc2876
1 files changed, 2475 insertions, 401 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 54b2975349c..e1886b2572f 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -25,7 +25,7 @@
Abort logging when we get an error in reading or writing log files
*/
-#include "my_global.h" /* NO_EMBEDDED_ACCESS_CHECKS */
+#include <my_global.h> /* NO_EMBEDDED_ACCESS_CHECKS */
#include "sql_priv.h"
#include "log.h"
#include "sql_base.h" // open_log_table
@@ -40,6 +40,7 @@
#include "rpl_rli.h"
#include "sql_audit.h"
#include "log_slow.h"
+#include "mysqld.h"
#include <my_dir.h>
#include <stdarg.h>
@@ -53,6 +54,7 @@
#include "rpl_handler.h"
#include "debug_sync.h"
#include "sql_show.h"
+#include "my_pthread.h"
/* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024
@@ -71,6 +73,8 @@ static int binlog_init(void *p);
static int binlog_close_connection(handlerton *hton, THD *thd);
static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv);
static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv);
+static bool binlog_savepoint_rollback_can_release_mdl(handlerton *hton,
+ THD *thd);
static int binlog_commit(handlerton *hton, THD *thd, bool all);
static int binlog_rollback(handlerton *hton, THD *thd, bool all);
static int binlog_prepare(handlerton *hton, THD *thd, bool all);
@@ -86,10 +90,14 @@ ulong opt_binlog_dbug_fsync_sleep= 0;
#endif
mysql_mutex_t LOCK_prepare_ordered;
+mysql_cond_t COND_prepare_ordered;
mysql_mutex_t LOCK_commit_ordered;
static ulonglong binlog_status_var_num_commits;
static ulonglong binlog_status_var_num_group_commits;
+static ulonglong binlog_status_group_commit_trigger_count;
+static ulonglong binlog_status_group_commit_trigger_lock_wait;
+static ulonglong binlog_status_group_commit_trigger_timeout;
static char binlog_snapshot_file[FN_REFLEN];
static ulonglong binlog_snapshot_position;
@@ -99,6 +107,12 @@ static SHOW_VAR binlog_status_vars_detail[]=
(char *)&binlog_status_var_num_commits, SHOW_LONGLONG},
{"group_commits",
(char *)&binlog_status_var_num_group_commits, SHOW_LONGLONG},
+ {"group_commit_trigger_count",
+ (char *)&binlog_status_group_commit_trigger_count, SHOW_LONGLONG},
+ {"group_commit_trigger_lock_wait",
+ (char *)&binlog_status_group_commit_trigger_lock_wait, SHOW_LONGLONG},
+ {"group_commit_trigger_timeout",
+ (char *)&binlog_status_group_commit_trigger_timeout, SHOW_LONGLONG},
{"snapshot_file",
(char *)&binlog_snapshot_file, SHOW_CHAR},
{"snapshot_position",
@@ -106,6 +120,18 @@ static SHOW_VAR binlog_status_vars_detail[]=
{NullS, NullS, SHOW_LONG}
};
+/*
+ Variables for the binlog background thread.
+ Protected by the MYSQL_BIN_LOG::LOCK_binlog_background_thread mutex.
+ */
+static bool binlog_background_thread_started= false;
+static bool binlog_background_thread_stop= false;
+static MYSQL_BIN_LOG::xid_count_per_binlog *
+ binlog_background_thread_queue= NULL;
+
+static bool start_binlog_background_thread();
+
+static rpl_binlog_state rpl_global_gtid_binlog_state;
/**
purge logs, master and slave sides both, related error code
@@ -157,9 +183,9 @@ public:
virtual bool handle_condition(THD *thd,
uint sql_errno,
const char* sql_state,
- MYSQL_ERROR::enum_warning_level level,
+ Sql_condition::enum_warning_level level,
const char* msg,
- MYSQL_ERROR ** cond_hdl);
+ Sql_condition ** cond_hdl);
const char *message() const { return m_message; }
};
@@ -167,9 +193,9 @@ bool
Silence_log_table_errors::handle_condition(THD *,
uint,
const char*,
- MYSQL_ERROR::enum_warning_level,
+ Sql_condition::enum_warning_level,
const char* msg,
- MYSQL_ERROR ** cond_hdl)
+ Sql_condition ** cond_hdl)
{
*cond_hdl= NULL;
strmake_buf(m_message, msg);
@@ -482,6 +508,14 @@ public:
*/
bool using_xa;
my_xid xa_xid;
+ bool need_unlog;
+ /*
+ Id of binlog that transaction was written to; only needed if need_unlog is
+ true.
+ */
+ ulong binlog_id;
+ /* Set if we get an error during commit that must be returned from unlog(). */
+ bool delayed_error;
private:
@@ -505,35 +539,51 @@ bool LOGGER::is_log_table_enabled(uint log_table_type)
}
-/* Check if a given table is opened log table */
-int check_if_log_table(size_t db_len, const char *db, size_t table_name_len,
- const char *table_name, bool check_if_opened)
+/**
+ Check if a given table is opened log table
+
+ @param table Table to check
+ @param check_if_opened Only fail if it's a log table in use
+ @param error_msg String to put in error message if not ok.
+ No error message if 0
+ @return 0 ok
+ @return # Type of log file
+ */
+
+int check_if_log_table(const TABLE_LIST *table,
+ bool check_if_opened,
+ const char *error_msg)
{
- if (db_len == 5 &&
- !(lower_case_table_names ?
- my_strcasecmp(system_charset_info, db, "mysql") :
- strcmp(db, "mysql")))
+ int result= 0;
+ if (table->db_length == 5 &&
+ !my_strcasecmp(table_alias_charset, table->db, "mysql"))
{
- if (table_name_len == 11 && !(lower_case_table_names ?
- my_strcasecmp(system_charset_info,
- table_name, "general_log") :
- strcmp(table_name, "general_log")))
+ const char *table_name= table->table_name;
+
+ if (table->table_name_length == 11 &&
+ !my_strcasecmp(table_alias_charset, table_name, "general_log"))
{
- if (!check_if_opened || logger.is_log_table_enabled(QUERY_LOG_GENERAL))
- return QUERY_LOG_GENERAL;
- return 0;
+ result= QUERY_LOG_GENERAL;
+ goto end;
}
- if (table_name_len == 8 && !(lower_case_table_names ?
- my_strcasecmp(system_charset_info, table_name, "slow_log") :
- strcmp(table_name, "slow_log")))
+ if (table->table_name_length == 8 &&
+ !my_strcasecmp(table_alias_charset, table_name, "slow_log"))
{
- if (!check_if_opened || logger.is_log_table_enabled(QUERY_LOG_SLOW))
- return QUERY_LOG_SLOW;
- return 0;
+ result= QUERY_LOG_SLOW;
+ goto end;
}
}
return 0;
+
+end:
+ if (!check_if_opened || logger.is_log_table_enabled(result))
+ {
+ if (error_msg)
+ my_error(ER_BAD_LOG_STATEMENT, MYF(0), error_msg);
+ return result;
+ }
+ return 0;
}
@@ -581,7 +631,7 @@ void Log_to_csv_event_handler::cleanup()
indicated in the return value.
@retval FALSE OK
- @retval TRUE error occured
+ @retval TRUE error occurred
*/
bool Log_to_csv_event_handler::
@@ -668,7 +718,8 @@ bool Log_to_csv_event_handler::
/* do a write */
if (table->field[1]->store(user_host, user_host_len, client_cs) ||
table->field[2]->store((longlong) thread_id, TRUE) ||
- table->field[3]->store((longlong) server_id, TRUE) ||
+ table->field[3]->store((longlong) global_system_variables.server_id,
+ TRUE) ||
table->field[4]->store(command_type, command_type_len, client_cs))
goto err;
@@ -745,7 +796,7 @@ err:
RETURN
FALSE - OK
- TRUE - error occured
+ TRUE - error occurred
*/
bool Log_to_csv_event_handler::
@@ -763,8 +814,8 @@ bool Log_to_csv_event_handler::
Open_tables_backup open_tables_backup;
CHARSET_INFO *client_cs= thd->variables.character_set_client;
bool save_time_zone_used;
- long query_time= (long) min(query_utime/1000000, TIME_MAX_VALUE_SECONDS);
- long lock_time= (long) min(lock_utime/1000000, TIME_MAX_VALUE_SECONDS);
+ long query_time= (long) MY_MIN(query_utime/1000000, TIME_MAX_VALUE_SECONDS);
+ long lock_time= (long) MY_MIN(lock_utime/1000000, TIME_MAX_VALUE_SECONDS);
long query_time_micro= (long) (query_utime % 1000000);
long lock_time_micro= (long) (lock_utime % 1000000);
@@ -826,10 +877,10 @@ bool Log_to_csv_event_handler::
if (table->field[3]->store_time(&t))
goto err;
/* rows_sent */
- if (table->field[4]->store((longlong) thd->sent_row_count, TRUE))
+ if (table->field[4]->store((longlong) thd->get_sent_row_count(), TRUE))
goto err;
/* rows_examined */
- if (table->field[5]->store((longlong) thd->examined_row_count, TRUE))
+ if (table->field[5]->store((longlong) thd->get_examined_row_count(), TRUE))
goto err;
/* fill database field */
@@ -865,7 +916,7 @@ bool Log_to_csv_event_handler::
table->field[8]->set_notnull();
}
- if (table->field[9]->store((longlong) server_id, TRUE))
+ if (table->field[9]->store((longlong)global_system_variables.server_id, TRUE))
goto err;
table->field[9]->set_notnull();
@@ -877,6 +928,9 @@ bool Log_to_csv_event_handler::
if (table->field[10]->store(sql_text, sql_text_len, client_cs) < 0)
goto err;
+ if (table->field[11]->store((longlong) thd->thread_id, TRUE))
+ goto err;
+
/* log table entries are not replicated */
if (table->file->ha_write_row(table->record[0]))
goto err;
@@ -1047,7 +1101,7 @@ void Log_to_file_event_handler::flush()
RETURN
FALSE - OK
- TRUE - error occured
+ TRUE - error occurred
*/
bool LOGGER::error_log_print(enum loglevel level, const char *format,
@@ -1205,7 +1259,7 @@ bool LOGGER::flush_general_log()
RETURN
FALSE OK
- TRUE error occured
+ TRUE error occurred
*/
bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length,
@@ -1240,7 +1294,7 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length,
/* fill in user_host value: the format is "%s[%s] @ %s [%s]" */
user_host_len= (strxnmov(user_host_buff, MAX_USER_HOST_SIZE,
- sctx->priv_user ? sctx->priv_user : "", "[",
+ sctx->priv_user, "[",
sctx->user ? sctx->user : (thd->slave_thread ? "SQL_SLAVE" : ""), "] @ ",
sctx->host ? sctx->host : "", " [",
sctx->ip ? sctx->ip : "", "]", NullS) -
@@ -1256,8 +1310,8 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length,
if (!query)
{
is_command= TRUE;
- query= command_name[thd->command].str;
- query_length= command_name[thd->command].length;
+ query= command_name[thd->get_command()].str;
+ query_length= command_name[thd->get_command()].length;
}
for (current_handler= slow_log_handler_list; *current_handler ;)
@@ -1589,6 +1643,8 @@ int binlog_init(void *p)
binlog_hton->close_connection= binlog_close_connection;
binlog_hton->savepoint_set= binlog_savepoint_set;
binlog_hton->savepoint_rollback= binlog_savepoint_rollback;
+ binlog_hton->savepoint_rollback_can_release_mdl=
+ binlog_savepoint_rollback_can_release_mdl;
binlog_hton->commit= binlog_commit;
binlog_hton->rollback= binlog_rollback;
binlog_hton->prepare= binlog_prepare;
@@ -1633,6 +1689,7 @@ static int binlog_close_connection(handlerton *hton, THD *thd)
contain updates to non-transactional tables. Or it can be a flush of
a statement cache.
*/
+
static int
binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
Log_event *end_ev, bool all, bool using_stmt,
@@ -1640,6 +1697,7 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
{
int error= 0;
DBUG_ENTER("binlog_flush_cache");
+ DBUG_PRINT("enter", ("end_ev: %p", end_ev));
if ((using_stmt && !cache_mngr->stmt_cache.empty()) ||
(using_trx && !cache_mngr->trx_cache.empty()))
@@ -1663,6 +1721,20 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
end_ev, all,
using_stmt, using_trx);
}
+ else
+ {
+ /*
+ This can happen in row-format binlog with something like
+ BEGIN; INSERT INTO nontrans_table; INSERT IGNORE INTO trans_table;
+ The nontrans_table is written directly into the binlog before commit,
+ and if the trans_table is ignored there will be no rows to write when
+ we get here.
+
+ So there is no work to do. Therefore, we will not increment any XID
+ count, so we must not decrement any XID count in unlog().
+ */
+ cache_mngr->need_unlog= 0;
+ }
cache_mngr->reset(using_stmt, using_trx);
DBUG_ASSERT((!using_stmt || cache_mngr->stmt_cache.empty()) &&
@@ -1684,9 +1756,10 @@ static inline int
binlog_commit_flush_stmt_cache(THD *thd, bool all,
binlog_cache_mngr *cache_mngr)
{
+ DBUG_ENTER("binlog_commit_flush_stmt_cache");
Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
FALSE, TRUE, TRUE, 0);
- return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE));
+ DBUG_RETURN(binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE));
}
/**
@@ -1701,9 +1774,10 @@ binlog_commit_flush_stmt_cache(THD *thd, bool all,
static inline int
binlog_commit_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr)
{
+ DBUG_ENTER("binlog_commit_flush_trx_cache");
Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
TRUE, TRUE, TRUE, 0);
- return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE));
+ DBUG_RETURN(binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE));
}
/**
@@ -1821,6 +1895,32 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all)
return 0;
}
+/*
+ We flush the cache wrapped in a beging/rollback if:
+ . aborting a single or multi-statement transaction and;
+ . the OPTION_KEEP_LOG is active or;
+ . the format is STMT and a non-trans table was updated or;
+ . the format is MIXED and a temporary non-trans table was
+ updated or;
+ . the format is MIXED, non-trans table was updated and
+ aborting a single statement transaction;
+*/
+static bool trans_cannot_safely_rollback(THD *thd, bool all)
+{
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+
+ return ((thd->variables.option_bits & OPTION_KEEP_LOG) ||
+ (trans_has_updated_non_trans_table(thd) &&
+ thd->variables.binlog_format == BINLOG_FORMAT_STMT) ||
+ (cache_mngr->trx_cache.changes_to_non_trans_temp_table() &&
+ thd->variables.binlog_format == BINLOG_FORMAT_MIXED) ||
+ (trans_has_updated_non_trans_table(thd) &&
+ ending_single_stmt_trans(thd,all) &&
+ thd->variables.binlog_format == BINLOG_FORMAT_MIXED));
+}
+
+
/**
This function is called once after each statement.
@@ -1941,25 +2041,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
}
else if (!error)
{
- /*
- We flush the cache wrapped in a beging/rollback if:
- . aborting a single or multi-statement transaction and;
- . the OPTION_KEEP_LOG is active or;
- . the format is STMT and a non-trans table was updated or;
- . the format is MIXED and a temporary non-trans table was
- updated or;
- . the format is MIXED, non-trans table was updated and
- aborting a single statement transaction;
- */
- if (ending_trans(thd, all) &&
- ((thd->variables.option_bits & OPTION_KEEP_LOG) ||
- (trans_has_updated_non_trans_table(thd) &&
- thd->variables.binlog_format == BINLOG_FORMAT_STMT) ||
- (cache_mngr->trx_cache.changes_to_non_trans_temp_table() &&
- thd->variables.binlog_format == BINLOG_FORMAT_MIXED) ||
- (trans_has_updated_non_trans_table(thd) &&
- ending_single_stmt_trans(thd,all) &&
- thd->variables.binlog_format == BINLOG_FORMAT_MIXED)))
+ if (ending_trans(thd, all) && trans_cannot_safely_rollback(thd, all))
error= binlog_rollback_flush_trx_cache(thd, all, cache_mngr);
/*
Truncate the cache if:
@@ -1988,6 +2070,21 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
DBUG_RETURN(error);
}
+
+void binlog_reset_cache(THD *thd)
+{
+ binlog_cache_mngr *const cache_mngr= opt_bin_log ?
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton) : 0;
+ DBUG_ENTER("binlog_reset_cache");
+ if (cache_mngr)
+ {
+ thd->binlog_remove_pending_rows_event(TRUE, TRUE);
+ cache_mngr->reset(true, true);
+ }
+ DBUG_VOID_RETURN;
+}
+
+
void MYSQL_BIN_LOG::set_write_error(THD *thd, bool is_transactional)
{
DBUG_ENTER("MYSQL_BIN_LOG::set_write_error");
@@ -2025,7 +2122,7 @@ bool MYSQL_BIN_LOG::check_write_error(THD *thd)
if (!thd->is_error())
DBUG_RETURN(checked);
- switch (thd->stmt_da->sql_errno())
+ switch (thd->get_stmt_da()->sql_errno())
{
case ER_TRANS_CACHE_FULL:
case ER_STMT_CACHE_FULL:
@@ -2066,9 +2163,7 @@ bool MYSQL_BIN_LOG::check_write_error(THD *thd)
static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
{
DBUG_ENTER("binlog_savepoint_set");
-
- binlog_trans_log_savepos(thd, (my_off_t*) sv);
- /* Write it to the binary log */
+ int error= 1;
char buf[1024];
String log_query(buf, sizeof(buf), &my_charset_bin);
@@ -2077,10 +2172,25 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
thd->lex->ident.str, thd->lex->ident.length))
DBUG_RETURN(1);
int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
- Query_log_event qinfo(thd, log_query.ptr(), log_query.length(),
+ Query_log_event qinfo(thd, log_query.c_ptr_safe(), log_query.length(),
TRUE, FALSE, TRUE, errcode);
- int ret= mysql_bin_log.write(&qinfo);
- DBUG_RETURN(ret);
+ /*
+ We cannot record the position before writing the statement
+ because a rollback to a savepoint (.e.g. consider it "S") would
+ prevent the savepoint statement (i.e. "SAVEPOINT S") from being
+ written to the binary log despite the fact that the server could
+ still issue other rollback statements to the same savepoint (i.e.
+ "S").
+ Given that the savepoint is valid until the server releases it,
+ ie, until the transaction commits or it is released explicitly,
+ we need to log it anyway so that we don't have "ROLLBACK TO S"
+ or "RELEASE S" without the preceding "SAVEPOINT S" in the binary
+ log.
+ */
+ if (!(error= mysql_bin_log.write(&qinfo)))
+ binlog_trans_log_savepos(thd, (my_off_t*) sv);
+
+ DBUG_RETURN(error);
}
static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
@@ -2111,6 +2221,30 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
}
+/**
+ Check whether binlog state allows to safely release MDL locks after
+ rollback to savepoint.
+
+ @param hton The binlog handlerton.
+ @param thd The client thread that executes the transaction.
+
+ @return true - It is safe to release MDL locks.
+ false - If it is not.
+*/
+static bool binlog_savepoint_rollback_can_release_mdl(handlerton *hton,
+ THD *thd)
+{
+ DBUG_ENTER("binlog_savepoint_rollback_can_release_mdl");
+ /*
+ If we have not updated any non-transactional tables rollback
+ to savepoint will simply truncate binlog cache starting from
+ SAVEPOINT command. So it should be safe to release MDL acquired
+ after SAVEPOINT command in this case.
+ */
+ DBUG_RETURN(!trans_cannot_safely_rollback(thd, true));
+}
+
+
int check_binlog_magic(IO_CACHE* log, const char** errmsg)
{
uchar magic[4];
@@ -2245,7 +2379,7 @@ static int find_uniq_filename(char *name)
DBUG_RETURN(1);
}
file_info= dir_info->dir_entry;
- for (i= dir_info->number_off_files ; i-- ; file_info++)
+ for (i= dir_info->number_of_files ; i-- ; file_info++)
{
if (strncmp(file_info->name, start, length) == 0 &&
test_if_number(file_info->name+length, &number,0))
@@ -2538,7 +2672,8 @@ int MYSQL_LOG::generate_new_name(char *new_name, const char *log_name)
{
if (!fn_ext(log_name)[0])
{
- if (find_uniq_filename(new_name))
+ if (DBUG_EVALUATE_IF("binlog_inject_new_name_error", TRUE, FALSE) ||
+ find_uniq_filename(new_name))
{
if (current_thd)
my_printf_error(ER_NO_UNIQUE_LOGFILE, ER(ER_NO_UNIQUE_LOGFILE),
@@ -2567,16 +2702,16 @@ int MYSQL_LOG::generate_new_name(char *new_name, const char *log_name)
void MYSQL_QUERY_LOG::reopen_file()
{
char *save_name;
-
DBUG_ENTER("MYSQL_LOG::reopen_file");
+
+ mysql_mutex_lock(&LOCK_log);
if (!is_open())
{
DBUG_PRINT("info",("log is closed"));
+ mysql_mutex_unlock(&LOCK_log);
DBUG_VOID_RETURN;
}
- mysql_mutex_lock(&LOCK_log);
-
save_name= name;
name= 0; // Don't free name
close(LOG_CLOSE_TO_BE_OPENED);
@@ -2620,7 +2755,7 @@ void MYSQL_QUERY_LOG::reopen_file()
RETURN
FASE - OK
- TRUE - error occured
+ TRUE - error occurred
*/
bool MYSQL_QUERY_LOG::write(time_t event_time, const char *user_host,
@@ -2722,7 +2857,7 @@ err:
RETURN
FALSE - OK
- TRUE - error occured
+ TRUE - error occurred
*/
bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
@@ -2735,13 +2870,6 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
DBUG_ENTER("MYSQL_QUERY_LOG::write");
mysql_mutex_lock(&LOCK_log);
-
- if (!is_open())
- {
- mysql_mutex_unlock(&LOCK_log);
- DBUG_RETURN(0);
- }
-
if (is_open())
{ // Safety agains reopen
int tmp_errno= 0;
@@ -2786,8 +2914,8 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
(ulong) thd->thread_id, (thd->db ? thd->db : ""),
((thd->query_plan_flags & QPLAN_QC) ? "Yes" : "No"),
query_time_buff, lock_time_buff,
- (ulong) thd->sent_row_count,
- (ulong) thd->examined_row_count) == (size_t) -1)
+ (ulong) thd->get_sent_row_count(),
+ (ulong) thd->get_examined_row_count()) == (size_t) -1)
tmp_errno= errno;
if ((thd->variables.log_slow_verbosity & LOG_SLOW_VERBOSITY_QUERY_PLAN) &&
(thd->query_plan_flags &
@@ -2796,7 +2924,8 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
my_b_printf(&log_file,
"# Full_scan: %s Full_join: %s "
"Tmp_table: %s Tmp_table_on_disk: %s\n"
- "# Filesort: %s Filesort_on_disk: %s Merge_passes: %lu\n",
+ "# Filesort: %s Filesort_on_disk: %s Merge_passes: %lu "
+ "Priority_queue: %s\n",
((thd->query_plan_flags & QPLAN_FULL_SCAN) ? "Yes" : "No"),
((thd->query_plan_flags & QPLAN_FULL_JOIN) ? "Yes" : "No"),
((thd->query_plan_flags & QPLAN_TMP_TABLE) ? "Yes" : "No"),
@@ -2804,8 +2933,20 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
((thd->query_plan_flags & QPLAN_FILESORT) ? "Yes" : "No"),
((thd->query_plan_flags & QPLAN_FILESORT_DISK) ?
"Yes" : "No"),
- thd->query_plan_fsort_passes) == (size_t) -1)
+ thd->query_plan_fsort_passes,
+ ((thd->query_plan_flags & QPLAN_FILESORT_PRIORITY_QUEUE) ?
+ "Yes" : "No")
+ ) == (size_t) -1)
tmp_errno= errno;
+ if (thd->variables.log_slow_verbosity & LOG_SLOW_VERBOSITY_EXPLAIN &&
+ thd->lex->explain)
+ {
+ StringBuffer<128> buf;
+ DBUG_ASSERT(!thd->free_list);
+ if (!print_explain_query(thd->lex, thd, &buf))
+ my_b_printf(&log_file, "%s", buf.c_ptr_safe());
+ thd->free_items();
+ }
if (thd->db && strcmp(thd->db, db))
{ // Database changed
if (my_b_printf(&log_file,"use %s;\n",thd->db) == (size_t) -1)
@@ -2895,7 +3036,7 @@ const char *MYSQL_LOG::generate_name(const char *log_name,
{
char *p= fn_ext(log_name);
uint length= (uint) (p - log_name);
- strmake(buff, log_name, min(length, FN_REFLEN-1));
+ strmake(buff, log_name, MY_MIN(length, FN_REFLEN-1));
return (const char*)buff;
}
return log_name;
@@ -2904,15 +3045,19 @@ const char *MYSQL_LOG::generate_name(const char *log_name,
MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period)
- :bytes_written(0), prepared_xids(0), file_id(1), open_count(1),
- need_start_event(TRUE),
+ :reset_master_pending(0), mark_xid_done_waiting(0),
+ bytes_written(0), file_id(1), open_count(1),
group_commit_queue(0), group_commit_queue_busy(FALSE),
num_commits(0), num_group_commits(0),
+ group_commit_trigger_count(0), group_commit_trigger_timeout(0),
+ group_commit_trigger_lock_wait(0),
sync_period_ptr(sync_period), sync_counter(0),
+ state_file_deleted(false), binlog_state_recover_done(false),
is_relay_log(0), signal_cnt(0),
checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF),
relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
- description_event_for_exec(0), description_event_for_queue(0)
+ description_event_for_exec(0), description_event_for_queue(0),
+ current_binlog_id(0)
{
/*
We don't want to initialize locks here as such initialization depends on
@@ -2932,23 +3077,65 @@ void MYSQL_BIN_LOG::cleanup()
DBUG_ENTER("cleanup");
if (inited)
{
+ xid_count_per_binlog *b;
+
+ /* Wait for the binlog background thread to stop. */
+ if (!is_relay_log && binlog_background_thread_started)
+ {
+ mysql_mutex_lock(&LOCK_binlog_background_thread);
+ binlog_background_thread_stop= true;
+ mysql_cond_signal(&COND_binlog_background_thread);
+ while (binlog_background_thread_stop)
+ mysql_cond_wait(&COND_binlog_background_thread_end,
+ &LOCK_binlog_background_thread);
+ mysql_mutex_unlock(&LOCK_binlog_background_thread);
+ binlog_background_thread_started= false;
+ }
+
inited= 0;
+ mysql_mutex_lock(&LOCK_log);
close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT);
+ mysql_mutex_unlock(&LOCK_log);
delete description_event_for_queue;
delete description_event_for_exec;
+
+ while ((b= binlog_xid_count_list.get()))
+ {
+ /*
+ There should be no pending XIDs at shutdown, and only one entry (for
+ the active binlog file) in the list.
+ */
+ DBUG_ASSERT(b->xid_count == 0);
+ DBUG_ASSERT(!binlog_xid_count_list.head());
+ my_free(b);
+ }
+
mysql_mutex_destroy(&LOCK_log);
mysql_mutex_destroy(&LOCK_index);
+ mysql_mutex_destroy(&LOCK_xid_list);
+ mysql_mutex_destroy(&LOCK_binlog_background_thread);
mysql_cond_destroy(&update_cond);
+ mysql_cond_destroy(&COND_queue_busy);
+ mysql_cond_destroy(&COND_xid_list);
+ mysql_cond_destroy(&COND_binlog_background_thread);
+ mysql_cond_destroy(&COND_binlog_background_thread_end);
}
+
+ /*
+ Free data for global binlog state.
+ We can't do that automaticly as we need to do this before
+ safemalloc is shut down
+ */
+ if (!is_relay_log)
+ rpl_global_gtid_binlog_state.free();
DBUG_VOID_RETURN;
}
/* Init binlog-specific vars */
-void MYSQL_BIN_LOG::init(bool no_auto_events_arg, ulong max_size_arg)
+void MYSQL_BIN_LOG::init(ulong max_size_arg)
{
DBUG_ENTER("MYSQL_BIN_LOG::init");
- no_auto_events= no_auto_events_arg;
max_size= max_size_arg;
DBUG_PRINT("info",("max_size: %lu", max_size));
DBUG_VOID_RETURN;
@@ -2960,8 +3147,18 @@ void MYSQL_BIN_LOG::init_pthread_objects()
MYSQL_LOG::init_pthread_objects();
mysql_mutex_init(m_key_LOCK_index, &LOCK_index, MY_MUTEX_INIT_SLOW);
mysql_mutex_setflags(&LOCK_index, MYF_NO_DEADLOCK_DETECTION);
+ mysql_mutex_init(key_BINLOG_LOCK_xid_list,
+ &LOCK_xid_list, MY_MUTEX_INIT_FAST);
mysql_cond_init(m_key_update_cond, &update_cond, 0);
mysql_cond_init(m_key_COND_queue_busy, &COND_queue_busy, 0);
+ mysql_cond_init(key_BINLOG_COND_xid_list, &COND_xid_list, 0);
+
+ mysql_mutex_init(key_BINLOG_LOCK_binlog_background_thread,
+ &LOCK_binlog_background_thread, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_BINLOG_COND_binlog_background_thread,
+ &COND_binlog_background_thread, 0);
+ mysql_cond_init(key_BINLOG_COND_binlog_background_thread_end,
+ &COND_binlog_background_thread_end, 0);
}
@@ -3049,16 +3246,31 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
enum_log_type log_type_arg,
const char *new_name,
enum cache_type io_cache_type_arg,
- bool no_auto_events_arg,
ulong max_size_arg,
bool null_created_arg,
bool need_mutex)
{
File file= -1;
-
+ xid_count_per_binlog *new_xid_list_entry= NULL, *b;
DBUG_ENTER("MYSQL_BIN_LOG::open");
DBUG_PRINT("enter",("log_type: %d",(int) log_type_arg));
+ mysql_mutex_assert_owner(&LOCK_log);
+
+ if (!is_relay_log)
+ {
+ if (!binlog_state_recover_done)
+ {
+ binlog_state_recover_done= true;
+ if (do_binlog_recovery(opt_bin_logname, false))
+ DBUG_RETURN(1);
+ }
+
+ if (!binlog_background_thread_started &&
+ start_binlog_background_thread())
+ DBUG_RETURN(1);
+ }
+
if (init_and_set_log_file_name(log_name, new_name, log_type_arg,
io_cache_type_arg))
{
@@ -3078,7 +3290,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
it may be good to consider what actually happens when
open_purge_index_file succeeds but register or sync fails.
- Perhaps we might need the code below in MYSQL_LOG_BIN::cleanup
+ Perhaps we might need the code below in MYSQL_BIN_LOG::cleanup
for "real life" purposes as well?
*/
DBUG_EXECUTE_IF("fault_injection_registering_index", {
@@ -3110,7 +3322,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
DBUG_RETURN(1); /* all warnings issued */
}
- init(no_auto_events_arg, max_size_arg);
+ init(max_size_arg);
open_count++;
@@ -3134,11 +3346,10 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
write_file_name_to_index_file= 1;
}
- if (need_start_event && !no_auto_events)
{
/*
- In 4.x we set need_start_event=0 here, but in 5.0 we want a Start event
- even if this is not the very first binlog.
+ In 4.x we put Start event only in the first binlog. But from 5.0 we
+ want a Start event even if this is not the very first binlog.
*/
Format_description_log_event s(BINLOG_VERSION);
/*
@@ -3165,6 +3376,101 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
if (s.write(&log_file))
goto err;
bytes_written+= s.data_written;
+
+ if (!is_relay_log)
+ {
+ char buf[FN_REFLEN];
+
+ /*
+ Output a Gtid_list_log_event at the start of the binlog file.
+
+ This is used to quickly determine which GTIDs are found in binlog
+ files earlier than this one, and which are found in this (or later)
+ binlogs.
+
+ The list gives a mapping from (domain_id, server_id) -> seq_no (so
+ this means that there is at most one entry for every unique pair
+ (domain_id, server_id) in the list). It indicates that this seq_no is
+ the last one found in an earlier binlog file for this (domain_id,
+ server_id) combination - so any higher seq_no should be search for
+ from this binlog file, or a later one.
+
+ This allows to locate the binlog file containing a given GTID by
+ scanning backwards, reading just the Gtid_list_log_event at the
+ start of each file, and scanning only the relevant binlog file when
+ found, not all binlog files.
+
+ The existence of a given entry (domain_id, server_id, seq_no)
+ guarantees only that this seq_no will not be found in this or any
+ later binlog file. It does not guarantee that it can be found it an
+ earlier binlog file, for example the file may have been purged.
+
+ If there is no entry for a given (domain_id, server_id) pair, then
+ it means that no such GTID exists in any earlier binlog. It is
+ permissible to remove such pair from future Gtid_list_log_events
+ if all previous binlog files containing such GTIDs have been purged
+ (though such optimization is not performed at the time of this
+ writing). So if there is no entry for given GTID it means that such
+ GTID should be search for in this or later binlog file, same as if
+ there had been an entry (domain_id, server_id, 0).
+ */
+
+ Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state, 0);
+ if (gl_ev.write(&log_file))
+ goto err;
+
+ /* Output a binlog checkpoint event at the start of the binlog file. */
+
+ /*
+ Construct an entry in the binlog_xid_count_list for the new binlog
+ file (we will not link it into the list until we know the new file
+ is successfully created; otherwise we would have to remove it again
+ if creation failed, which gets tricky since other threads may have
+ seen the entry in the meantime - and we do not want to hold
+ LOCK_xid_list for long periods of time).
+
+ Write the current binlog checkpoint into the log, so XA recovery will
+ know from where to start recovery.
+ */
+ uint off= dirname_length(log_file_name);
+ uint len= strlen(log_file_name) - off;
+ char *entry_mem, *name_mem;
+ if (!(new_xid_list_entry = (xid_count_per_binlog *)
+ my_multi_malloc(MYF(MY_WME),
+ &entry_mem, sizeof(xid_count_per_binlog),
+ &name_mem, len,
+ NULL)))
+ goto err;
+ memcpy(name_mem, log_file_name+off, len);
+ new_xid_list_entry->binlog_name= name_mem;
+ new_xid_list_entry->binlog_name_len= len;
+ new_xid_list_entry->xid_count= 0;
+
+ /*
+ Find the name for the Initial binlog checkpoint.
+
+ Normally this will just be the first entry, as we delete entries
+ when their count drops to zero. But we scan the list to handle any
+ corner case, eg. for the first binlog file opened after startup, the
+ list will be empty.
+ */
+ mysql_mutex_lock(&LOCK_xid_list);
+ I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list);
+ while ((b= it++) && b->xid_count == 0)
+ ;
+ mysql_mutex_unlock(&LOCK_xid_list);
+ if (!b)
+ b= new_xid_list_entry;
+ strmake(buf, b->binlog_name, b->binlog_name_len);
+ Binlog_checkpoint_log_event ev(buf, len);
+ DBUG_EXECUTE_IF("crash_before_write_checkpoint_event",
+ flush_io_cache(&log_file);
+ mysql_file_sync(log_file.file, MYF(MY_WME));
+ DBUG_SUICIDE(););
+ if (ev.write(&log_file))
+ goto err;
+ bytes_written+= ev.data_written;
+ }
}
if (description_event_for_queue &&
description_event_for_queue->binlog_version>=4)
@@ -3235,6 +3541,42 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
#endif
}
}
+
+ if (!is_relay_log)
+ {
+ /*
+ Now the file was created successfully, so we can link in the entry for
+ the new binlog file in binlog_xid_count_list.
+ */
+ mysql_mutex_lock(&LOCK_xid_list);
+ ++current_binlog_id;
+ new_xid_list_entry->binlog_id= current_binlog_id;
+ /* Remove any initial entries with no pending XIDs. */
+ while ((b= binlog_xid_count_list.head()) && b->xid_count == 0)
+ my_free(binlog_xid_count_list.get());
+ binlog_xid_count_list.push_back(new_xid_list_entry);
+ mysql_mutex_unlock(&LOCK_xid_list);
+
+ /*
+ Now that we have synced a new binlog file with an initial Gtid_list
+ event, it is safe to delete the binlog state file. We will write out
+ a new, updated file at shutdown, and if we crash before we can recover
+ the state from the newly written binlog file.
+
+ Since the state file will contain out-of-date data as soon as the first
+ new GTID is binlogged, it is better to remove it, to avoid any risk of
+ accidentally reading incorrect data later.
+ */
+ if (!state_file_deleted)
+ {
+ char buf[FN_REFLEN];
+ fn_format(buf, opt_bin_logname, mysql_data_home, ".state",
+ MY_UNPACK_FILENAME);
+ my_delete(buf, MY_SYNC_DIR);
+ state_file_deleted= true;
+ }
+ }
+
log_state= LOG_OPENED;
#ifdef HAVE_REPLICATION
@@ -3253,6 +3595,8 @@ err:
Turning logging off for the whole duration of the MySQL server process. \
To turn it on again: fix the cause, \
shutdown the MySQL server and restart it.", name, errno);
+ if (new_xid_list_entry)
+ my_free(new_xid_list_entry);
if (file >= 0)
mysql_file_close(file, MYF(0));
close(LOG_CLOSE_INDEX);
@@ -3312,7 +3656,8 @@ static bool copy_up_file_and_fill(IO_CACHE *index_file, my_off_t offset)
if (!bytes_read)
break; // end of file
mysql_file_seek(file, offset-init_offset, MY_SEEK_SET, MYF(0));
- if (mysql_file_write(file, io_buf, bytes_read, MYF(MY_WME | MY_NABP)))
+ if (mysql_file_write(file, io_buf, bytes_read,
+ MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL)))
goto err;
}
/* The following will either truncate the file or fill the end with \n' */
@@ -3498,11 +3843,11 @@ err:
/**
Delete all logs refered to in the index file.
- Start writing to a new log file.
The new index file will only contain this file.
- @param thd Thread
+ @param thd Thread
+ @param create_new_log 1 if we should start writing to a new log file
@note
If not called from slave thread, write start event to new log
@@ -3513,7 +3858,8 @@ err:
1 error
*/
-bool MYSQL_BIN_LOG::reset_logs(THD* thd)
+bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log,
+ rpl_gtid *init_state, uint32 init_state_len)
{
LOG_INFO linfo;
bool error=0;
@@ -3521,7 +3867,34 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
const char* save_name;
DBUG_ENTER("reset_logs");
- ha_reset_logs(thd);
+ if (!is_relay_log)
+ {
+ if (init_state && !is_empty_state())
+ {
+ my_error(ER_BINLOG_MUST_BE_EMPTY, MYF(0));
+ DBUG_RETURN(1);
+ }
+
+ /*
+ Mark that a RESET MASTER is in progress.
+ This ensures that a binlog checkpoint will not try to write binlog
+ checkpoint events, which would be useless (as we are deleting the binlog
+ anyway) and could deadlock, as we are holding LOCK_log.
+
+ Wait for any mark_xid_done() calls that might be already running to
+ complete (mark_xid_done_waiting counter to drop to zero); we need to
+ do this before we take the LOCK_log to not deadlock.
+ */
+ mysql_mutex_lock(&LOCK_xid_list);
+ reset_master_pending++;
+ while (mark_xid_done_waiting > 0)
+ mysql_cond_wait(&COND_xid_list, &LOCK_xid_list);
+ mysql_mutex_unlock(&LOCK_xid_list);
+ }
+
+ DEBUG_SYNC(thd, "reset_logs_after_set_reset_master_pending");
+ if (thd)
+ ha_reset_logs(thd);
/*
We need to get both locks to be sure that no one is trying to
write to the index log file.
@@ -3529,6 +3902,50 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
mysql_mutex_lock(&LOCK_log);
mysql_mutex_lock(&LOCK_index);
+ if (!is_relay_log)
+ {
+ /*
+ We are going to nuke all binary log files.
+ Without binlog, we cannot XA recover prepared-but-not-committed
+ transactions in engines. So force a commit checkpoint first.
+
+ Note that we take and immediately release LOCK_commit_ordered. This has
+ the effect to ensure that any on-going group commit (in
+ trx_group_commit_leader()) has completed before we request the checkpoint,
+ due to the chaining of LOCK_log and LOCK_commit_ordered in that function.
+ (We are holding LOCK_log, so no new group commit can start).
+
+ Without this, it is possible (though perhaps unlikely) that the RESET
+ MASTER could run in-between the write to the binlog and the
+ commit_ordered() in the engine of some transaction, and then a crash
+ later would leave such transaction not recoverable.
+ */
+ mysql_mutex_lock(&LOCK_commit_ordered);
+ mysql_mutex_unlock(&LOCK_commit_ordered);
+
+ mark_xids_active(current_binlog_id, 1);
+ do_checkpoint_request(current_binlog_id);
+
+ /* Now wait for all checkpoint requests and pending unlog() to complete. */
+ mysql_mutex_lock(&LOCK_xid_list);
+ for (;;)
+ {
+ if (is_xidlist_idle_nolock())
+ break;
+ /*
+ Wait until signalled that one more binlog dropped to zero, then check
+ again.
+ */
+ mysql_cond_wait(&COND_xid_list, &LOCK_xid_list);
+ }
+
+ /*
+ Now all XIDs are fully flushed to disk, and we are holding LOCK_log so
+ no new ones will be written. So we can proceed to delete the logs.
+ */
+ mysql_mutex_unlock(&LOCK_xid_list);
+ }
+
/*
The following mutex is needed to ensure that no threads call
'delete thd' as we would then risk missing a 'rollback' from this
@@ -3567,7 +3984,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
{
if (my_errno == ENOENT)
{
- push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
linfo.log_file_name);
sql_print_information("Failed to delete file '%s'",
@@ -3577,7 +3994,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
}
else
{
- push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
ER_BINLOG_PURGE_FATAL_ERR,
"a problem with deleting %s; "
"consider examining correspondence "
@@ -3592,13 +4009,21 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
break;
}
+ if (!is_relay_log)
+ {
+ if (init_state)
+ rpl_global_gtid_binlog_state.load(init_state, init_state_len);
+ else
+ rpl_global_gtid_binlog_state.reset();
+ }
+
/* Start logging with a new file */
close(LOG_CLOSE_INDEX | LOG_CLOSE_TO_BE_OPENED);
if ((error= my_delete(index_file_name, MYF(0)))) // Reset (open will update)
{
if (my_errno == ENOENT)
{
- push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
index_file_name);
sql_print_information("Failed to delete file '%s'",
@@ -3608,7 +4033,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
}
else
{
- push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
ER_BINLOG_PURGE_FATAL_ERR,
"a problem with deleting %s; "
"consider examining correspondence "
@@ -3619,10 +4044,8 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
goto err;
}
}
- if (!thd->slave_thread)
- need_start_event=1;
- if (!open_index_file(index_file_name, 0, FALSE))
- if ((error= open(save_name, log_type, 0, io_cache_type, no_auto_events, max_size, 0, FALSE)))
+ if (create_new_log && !open_index_file(index_file_name, 0, FALSE))
+ if ((error= open(save_name, log_type, 0, io_cache_type, max_size, 0, FALSE)))
goto err;
my_free((void *) save_name);
@@ -3630,6 +4053,31 @@ err:
if (error == 1)
name= const_cast<char*>(save_name);
mysql_mutex_unlock(&LOCK_thread_count);
+
+ if (!is_relay_log)
+ {
+ xid_count_per_binlog *b;
+ /*
+ Remove all entries in the xid_count list except the last.
+ Normally we will just be deleting all the entries that we waited for to
+ drop to zero above. But if we fail during RESET MASTER for some reason
+ then we will not have created any new log file, and we may keep the last
+ of the old entries.
+ */
+ mysql_mutex_lock(&LOCK_xid_list);
+ for (;;)
+ {
+ b= binlog_xid_count_list.head();
+ DBUG_ASSERT(b /* List can never become empty. */);
+ if (b->binlog_id == current_binlog_id)
+ break;
+ DBUG_ASSERT(b->xid_count == 0);
+ my_free(binlog_xid_count_list.get());
+ }
+ reset_master_pending--;
+ mysql_mutex_unlock(&LOCK_xid_list);
+ }
+
mysql_mutex_unlock(&LOCK_index);
mysql_mutex_unlock(&LOCK_log);
DBUG_RETURN(error);
@@ -3677,17 +4125,41 @@ err:
int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
{
- int error;
+ int error, errcode;
char *to_purge_if_included= NULL;
+ inuse_relaylog *ir;
ulonglong log_space_reclaimed= 0;
DBUG_ENTER("purge_first_log");
DBUG_ASSERT(is_open());
- DBUG_ASSERT(rli->slave_running == 1);
+ DBUG_ASSERT(rli->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT);
DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name));
mysql_mutex_lock(&LOCK_index);
- to_purge_if_included= my_strdup(rli->group_relay_log_name, MYF(0));
+
+ ir= rli->inuse_relaylog_list;
+ while (ir)
+ {
+ inuse_relaylog *next= ir->next;
+ if (!ir->completed || ir->dequeued_count < ir->queued_count)
+ {
+ included= false;
+ break;
+ }
+ if (!included && !strcmp(ir->name, rli->group_relay_log_name))
+ break;
+ if (!next)
+ {
+ rli->last_inuse_relaylog= NULL;
+ included= 1;
+ to_purge_if_included= my_strdup(ir->name, MYF(0));
+ }
+ rli->free_inuse_relaylog(ir);
+ ir= next;
+ }
+ rli->inuse_relaylog_list= ir;
+ if (ir)
+ to_purge_if_included= my_strdup(ir->name, MYF(0));
/*
Read the next log file name from the index file and pass it back to
@@ -3724,7 +4196,8 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
}
/* Store where we are in the new file for the execution thread */
- flush_relay_log_info(rli);
+ if (flush_relay_log_info(rli))
+ error= LOG_INFO_IO;
DBUG_EXECUTE_IF("crash_before_purge_logs", DBUG_SUICIDE(););
@@ -3740,11 +4213,13 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
* Need to update the log pos because purge logs has been called
* after fetching initially the log pos at the begining of the method.
*/
- if((error=find_log_pos(&rli->linfo, rli->event_relay_log_name, 0)))
+ if ((errcode= find_log_pos(&rli->linfo, rli->event_relay_log_name, 0)))
{
char buff[22];
+ if (!error)
+ error= errcode;
sql_print_error("next log error: %d offset: %s log: %s included: %d",
- error,
+ errcode,
llstr(rli->linfo.index_file_offset,buff),
rli->group_relay_log_name,
included);
@@ -3834,8 +4309,7 @@ int MYSQL_BIN_LOG::purge_logs(const char *to_log,
if ((error=find_log_pos(&log_info, NullS, 0 /*no mutex*/)))
goto err;
while ((strcmp(to_log,log_info.log_file_name) || (exit_loop=included)) &&
- !is_active(log_info.log_file_name) &&
- !log_in_use(log_info.log_file_name))
+ can_purge_log(log_info.log_file_name))
{
if ((error= register_purge_index_entry(log_info.log_file_name)))
{
@@ -4023,7 +4497,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space,
*/
if (thd)
{
- push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
log_info.log_file_name);
}
@@ -4038,7 +4512,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space,
*/
if (thd)
{
- push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_BINLOG_PURGE_FATAL_ERR,
"a problem with getting info on being purged %s; "
"consider examining correspondence "
@@ -4066,7 +4540,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space,
{
if (thd)
{
- push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_BINLOG_PURGE_FATAL_ERR,
"a problem with deleting %s and "
"reading the binlog index file",
@@ -4102,7 +4576,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space,
{
if (thd)
{
- push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
log_info.log_file_name);
}
@@ -4114,7 +4588,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space,
{
if (thd)
{
- push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_BINLOG_PURGE_FATAL_ERR,
"a problem with deleting %s; "
"consider examining correspondence "
@@ -4185,8 +4659,7 @@ int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time)
goto err;
while (strcmp(log_file_name, log_info.log_file_name) &&
- !is_active(log_info.log_file_name) &&
- !log_in_use(log_info.log_file_name))
+ can_purge_log(log_info.log_file_name))
{
if (!mysql_file_stat(m_key_file_log,
log_info.log_file_name, &stat_area, MYF(0)))
@@ -4205,7 +4678,7 @@ int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time)
*/
if (thd)
{
- push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_BINLOG_PURGE_FATAL_ERR,
"a problem with getting info on being purged %s; "
"consider examining correspondence "
@@ -4239,9 +4712,57 @@ err:
mysql_mutex_unlock(&LOCK_index);
DBUG_RETURN(error);
}
+
+
+bool
+MYSQL_BIN_LOG::can_purge_log(const char *log_file_name)
+{
+ xid_count_per_binlog *b;
+
+ if (is_active(log_file_name))
+ return false;
+ mysql_mutex_lock(&LOCK_xid_list);
+ {
+ I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list);
+ while ((b= it++) &&
+ 0 != strncmp(log_file_name+dirname_length(log_file_name),
+ b->binlog_name, b->binlog_name_len))
+ ;
+ }
+ mysql_mutex_unlock(&LOCK_xid_list);
+ if (b)
+ return false;
+ return !log_in_use(log_file_name);
+}
#endif /* HAVE_REPLICATION */
+bool
+MYSQL_BIN_LOG::is_xidlist_idle()
+{
+ bool res;
+ mysql_mutex_lock(&LOCK_xid_list);
+ res= is_xidlist_idle_nolock();
+ mysql_mutex_unlock(&LOCK_xid_list);
+ return res;
+}
+
+
+bool
+MYSQL_BIN_LOG::is_xidlist_idle_nolock()
+{
+ xid_count_per_binlog *b;
+
+ I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list);
+ while ((b= it++))
+ {
+ if (b->xid_count > 0)
+ return false;
+ }
+ return true;
+}
+
+
/**
Create a new log file name.
@@ -4317,41 +4838,21 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
bool delay_close= false;
File old_file;
LINT_INIT(old_file);
-
DBUG_ENTER("MYSQL_BIN_LOG::new_file_impl");
- if (!is_open())
- {
- DBUG_PRINT("info",("log is closed"));
- DBUG_RETURN(error);
- }
if (need_lock)
mysql_mutex_lock(&LOCK_log);
- mysql_mutex_lock(&LOCK_index);
-
mysql_mutex_assert_owner(&LOCK_log);
- mysql_mutex_assert_owner(&LOCK_index);
- /*
- if binlog is used as tc log, be sure all xids are "unlogged",
- so that on recover we only need to scan one - latest - binlog file
- for prepared xids. As this is expected to be a rare event,
- simple wait strategy is enough. We're locking LOCK_log to be sure no
- new Xid_log_event's are added to the log (and prepared_xids is not
- increased), and waiting on COND_prep_xids for late threads to
- catch up.
- */
- if (prepared_xids)
+ if (!is_open())
{
- tc_log_page_waits++;
- mysql_mutex_lock(&LOCK_prep_xids);
- while (prepared_xids) {
- DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
- mysql_cond_wait(&COND_prep_xids, &LOCK_prep_xids);
- }
- mysql_mutex_unlock(&LOCK_prep_xids);
+ DBUG_PRINT("info",("log is closed"));
+ mysql_mutex_unlock(&LOCK_log);
+ DBUG_RETURN(error);
}
+ mysql_mutex_lock(&LOCK_index);
+
/* Reuse old name if not binlog and not update log */
new_name_ptr= name;
@@ -4366,7 +4867,6 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
if (log_type == LOG_BIN)
{
- if (!no_auto_events)
{
/*
We log the whole file name for log file as the user may decide
@@ -4441,7 +4941,7 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
/* reopen the binary log file. */
file_to_open= new_name_ptr;
error= open(old_name, log_type, new_name_ptr, io_cache_type,
- no_auto_events, max_size, 1, FALSE);
+ max_size, 1, FALSE);
}
/* handle reopening errors */
@@ -4485,20 +4985,31 @@ end:
new_name_ptr, errno);
}
+ mysql_mutex_unlock(&LOCK_index);
if (need_lock)
mysql_mutex_unlock(&LOCK_log);
- mysql_mutex_unlock(&LOCK_index);
DBUG_RETURN(error);
}
-bool MYSQL_BIN_LOG::append(Log_event* ev)
+bool
+MYSQL_BIN_LOG::append(Log_event *ev)
{
- bool error = 0;
+ bool res;
mysql_mutex_lock(&LOCK_log);
+ res= append_no_lock(ev);
+ mysql_mutex_unlock(&LOCK_log);
+ return res;
+}
+
+
+bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev)
+{
+ bool error = 0;
DBUG_ENTER("MYSQL_BIN_LOG::append");
+ mysql_mutex_assert_owner(&LOCK_log);
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
/*
Log_event::write() is smart enough to use my_b_write() or
@@ -4513,10 +5024,9 @@ bool MYSQL_BIN_LOG::append(Log_event* ev)
DBUG_PRINT("info",("max_size: %lu",max_size));
if (flush_and_sync(0))
goto err;
- if ((uint) my_b_append_tell(&log_file) > max_size)
+ if (my_b_append_tell(&log_file) > max_size)
error= new_file_without_locking();
err:
- mysql_mutex_unlock(&LOCK_log);
signal_update(); // Safe as we don't call close
DBUG_RETURN(error);
}
@@ -4544,7 +5054,7 @@ bool MYSQL_BIN_LOG::appendv(const char* buf, uint len,...)
DBUG_PRINT("info",("max_size: %lu",max_size));
if (flush_and_sync(0))
goto err;
- if ((uint) my_b_append_tell(&log_file) > max_size)
+ if (my_b_append_tell(&log_file) > max_size)
error= new_file_without_locking();
err:
if (!error)
@@ -4872,6 +5382,10 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
(long) table, table->s->table_name.str,
table->s->table_map_id));
+ /* Ensure that all events in a GTID group are in the same cache */
+ if (variables.option_bits & OPTION_GTID_BEGIN)
+ is_transactional= 1;
+
/* Pre-conditions */
DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
@@ -4892,7 +5406,7 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
if (with_annotate && *with_annotate)
{
- Annotate_rows_log_event anno(current_thd, is_transactional, false);
+ Annotate_rows_log_event anno(table->in_use, is_transactional, false);
/* Annotate event should be written not more than once */
*with_annotate= 0;
if ((error= anno.write(file)))
@@ -5057,6 +5571,244 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
DBUG_RETURN(error);
}
+
+/* Generate a new global transaction ID, and write it to the binlog */
+
+bool
+MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
+ bool is_transactional, uint64 commit_id)
+{
+ rpl_gtid gtid;
+ uint32 domain_id= thd->variables.gtid_domain_id;
+ uint32 server_id= thd->variables.server_id;
+ uint64 seq_no= thd->variables.gtid_seq_no;
+ int err;
+ DBUG_ENTER("write_gtid_event");
+ DBUG_PRINT("enter", ("standalone: %d", standalone));
+
+ if (thd->variables.option_bits & OPTION_GTID_BEGIN)
+ {
+ DBUG_PRINT("error", ("OPTION_GTID_BEGIN is set. "
+ "Master and slave will have different GTID values"));
+ /* Reset the flag, as we will write out a GTID anyway */
+ thd->variables.option_bits&= ~OPTION_GTID_BEGIN;
+ }
+
+ /*
+ Reset the session variable gtid_seq_no, to reduce the risk of accidentally
+ producing a duplicate GTID.
+ */
+ thd->variables.gtid_seq_no= 0;
+ if (seq_no != 0)
+ {
+ /* Use the specified sequence number. */
+ gtid.domain_id= domain_id;
+ gtid.server_id= server_id;
+ gtid.seq_no= seq_no;
+ err= rpl_global_gtid_binlog_state.update(&gtid, opt_gtid_strict_mode);
+ if (err && thd->get_stmt_da()->sql_errno()==ER_GTID_STRICT_OUT_OF_ORDER)
+ errno= ER_GTID_STRICT_OUT_OF_ORDER;
+ }
+ else
+ {
+ /* Allocate the next sequence number for the GTID. */
+ err= rpl_global_gtid_binlog_state.update_with_next_gtid(domain_id,
+ server_id, &gtid);
+ seq_no= gtid.seq_no;
+ }
+ if (err)
+ DBUG_RETURN(true);
+ thd->last_commit_gtid= gtid;
+
+ Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone,
+ LOG_EVENT_SUPPRESS_USE_F, is_transactional,
+ commit_id);
+
+ /* Write the event to the binary log. */
+ if (gtid_event.write(&mysql_bin_log.log_file))
+ DBUG_RETURN(true);
+ status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written);
+
+ DBUG_RETURN(false);
+}
+
+
+int
+MYSQL_BIN_LOG::write_state_to_file()
+{
+ File file_no;
+ IO_CACHE cache;
+ char buf[FN_REFLEN];
+ int err;
+ bool opened= false;
+ bool inited= false;
+
+ fn_format(buf, opt_bin_logname, mysql_data_home, ".state",
+ MY_UNPACK_FILENAME);
+ if ((file_no= mysql_file_open(key_file_binlog_state, buf,
+ O_RDWR|O_CREAT|O_TRUNC|O_BINARY,
+ MYF(MY_WME))) < 0)
+ {
+ err= 1;
+ goto err;
+ }
+ opened= true;
+ if ((err= init_io_cache(&cache, file_no, IO_SIZE, WRITE_CACHE, 0, 0,
+ MYF(MY_WME|MY_WAIT_IF_FULL))))
+ goto err;
+ inited= true;
+ if ((err= rpl_global_gtid_binlog_state.write_to_iocache(&cache)))
+ goto err;
+ inited= false;
+ if ((err= end_io_cache(&cache)))
+ goto err;
+ if ((err= mysql_file_sync(file_no, MYF(MY_WME|MY_SYNC_FILESIZE))))
+ goto err;
+ goto end;
+
+err:
+ sql_print_error("Error writing binlog state to file '%s'.\n", buf);
+ if (inited)
+ end_io_cache(&cache);
+end:
+ if (opened)
+ mysql_file_close(file_no, MYF(0));
+
+ return err;
+}
+
+
+/*
+ Initialize the binlog state from the master-bin.state file, at server startup.
+
+ Returns:
+ 0 for success.
+ 2 for when .state file did not exist.
+ 1 for other error.
+*/
+int
+MYSQL_BIN_LOG::read_state_from_file()
+{
+ File file_no;
+ IO_CACHE cache;
+ char buf[FN_REFLEN];
+ int err;
+ bool opened= false;
+ bool inited= false;
+
+ fn_format(buf, opt_bin_logname, mysql_data_home, ".state",
+ MY_UNPACK_FILENAME);
+ if ((file_no= mysql_file_open(key_file_binlog_state, buf,
+ O_RDONLY|O_BINARY, MYF(0))) < 0)
+ {
+ if (my_errno != ENOENT)
+ {
+ err= 1;
+ goto err;
+ }
+ else
+ {
+ /*
+ If the state file does not exist, this is the first server startup
+ with GTID enabled. So initialize to empty state.
+ */
+ rpl_global_gtid_binlog_state.reset();
+ err= 2;
+ goto end;
+ }
+ }
+ opened= true;
+ if ((err= init_io_cache(&cache, file_no, IO_SIZE, READ_CACHE, 0, 0,
+ MYF(MY_WME|MY_WAIT_IF_FULL))))
+ goto err;
+ inited= true;
+ if ((err= rpl_global_gtid_binlog_state.read_from_iocache(&cache)))
+ goto err;
+ goto end;
+
+err:
+ sql_print_error("Error reading binlog GTID state from file '%s'.\n", buf);
+end:
+ if (inited)
+ end_io_cache(&cache);
+ if (opened)
+ mysql_file_close(file_no, MYF(0));
+
+ return err;
+}
+
+
+int
+MYSQL_BIN_LOG::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
+{
+ return rpl_global_gtid_binlog_state.get_most_recent_gtid_list(list, size);
+}
+
+
+bool
+MYSQL_BIN_LOG::append_state_pos(String *str)
+{
+ return rpl_global_gtid_binlog_state.append_pos(str);
+}
+
+
+bool
+MYSQL_BIN_LOG::append_state(String *str)
+{
+ return rpl_global_gtid_binlog_state.append_state(str);
+}
+
+
+bool
+MYSQL_BIN_LOG::is_empty_state()
+{
+ return (rpl_global_gtid_binlog_state.count() == 0);
+}
+
+
+bool
+MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id,
+ rpl_gtid *out_gtid)
+{
+ rpl_gtid *gtid;
+ if ((gtid= rpl_global_gtid_binlog_state.find(domain_id, server_id)))
+ *out_gtid= *gtid;
+ return gtid != NULL;
+}
+
+
+bool
+MYSQL_BIN_LOG::lookup_domain_in_binlog_state(uint32 domain_id,
+ rpl_gtid *out_gtid)
+{
+ rpl_gtid *found_gtid;
+
+ if ((found_gtid= rpl_global_gtid_binlog_state.find_most_recent(domain_id)))
+ {
+ *out_gtid= *found_gtid;
+ return true;
+ }
+
+ return false;
+}
+
+
+int
+MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint32 domain_id, uint64 seq_no)
+{
+ return rpl_global_gtid_binlog_state.bump_seq_no_if_needed(domain_id, seq_no);
+}
+
+
+bool
+MYSQL_BIN_LOG::check_strict_gtid_sequence(uint32 domain_id, uint32 server_id,
+ uint64 seq_no)
+{
+ return rpl_global_gtid_binlog_state.check_strict_sequence(domain_id,
+ server_id, seq_no);
+}
+
+
/**
Write an event to the binary log. If with_annotate != NULL and
*with_annotate = TRUE write also Annotate_rows before the event
@@ -5067,11 +5819,21 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
{
THD *thd= event_info->thd;
bool error= 1;
- DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)");
binlog_cache_data *cache_data= 0;
bool is_trans_cache= FALSE;
bool using_trans= event_info->use_trans_cache();
bool direct= event_info->use_direct_logging();
+ ulong prev_binlog_id;
+ DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)");
+ LINT_INIT(prev_binlog_id);
+
+ if (thd->variables.option_bits & OPTION_GTID_BEGIN)
+ {
+ DBUG_PRINT("info", ("OPTION_GTID_BEGIN was set"));
+ /* Wait for commit from binary log before we commit */
+ direct= 0;
+ using_trans= 1;
+ }
if (thd->binlog_evt_union.do_union)
{
@@ -5120,9 +5882,26 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
if (direct)
{
+ int res;
+ uint64 commit_id= 0;
+ DBUG_PRINT("info", ("direct is set"));
+ if ((res= thd->wait_for_prior_commit()))
+ DBUG_RETURN(res);
file= &log_file;
my_org_b_tell= my_b_tell(file);
mysql_mutex_lock(&LOCK_log);
+ prev_binlog_id= current_binlog_id;
+ DBUG_EXECUTE_IF("binlog_force_commit_id",
+ {
+ const LEX_STRING name= { C_STRING_WITH_LEN("commit_id") };
+ bool null_value;
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars,
+ (uchar*) name.str, name.length);
+ commit_id= entry->val_int(&null_value);
+ });
+ if (write_gtid_event(thd, true, using_trans, commit_id))
+ goto err;
}
else
{
@@ -5266,7 +6045,7 @@ err:
mysql_mutex_unlock(&LOCK_log);
if (check_purge)
- purge();
+ checkpoint_and_purge(prev_binlog_id);
}
if (error)
@@ -5351,6 +6130,60 @@ bool general_log_write(THD *thd, enum enum_server_command command,
return FALSE;
}
+
+static void
+binlog_checkpoint_callback(void *cookie)
+{
+ MYSQL_BIN_LOG::xid_count_per_binlog *entry=
+ (MYSQL_BIN_LOG::xid_count_per_binlog *)cookie;
+ /*
+ For every supporting engine, we increment the xid_count and issue a
+ commit_checkpoint_request(). Then we can count when all
+ commit_checkpoint_notify() callbacks have occurred, and then log a new
+ binlog checkpoint event.
+ */
+ mysql_bin_log.mark_xids_active(entry->binlog_id, 1);
+}
+
+
+/*
+ Request a commit checkpoint from each supporting engine.
+ This must be called after each binlog rotate, and after LOCK_log has been
+ released. The xid_count value in the xid_count_per_binlog entry was
+ incremented by 1 and will be decremented in this function; this ensures
+ that the entry will not go away early despite LOCK_log not being held.
+*/
+void
+MYSQL_BIN_LOG::do_checkpoint_request(ulong binlog_id)
+{
+ xid_count_per_binlog *entry;
+
+ /*
+ Find the binlog entry, and invoke commit_checkpoint_request() on it in
+ each supporting storage engine.
+ */
+ mysql_mutex_lock(&LOCK_xid_list);
+ I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list);
+ do {
+ entry= it++;
+ DBUG_ASSERT(entry /* binlog_id is always somewhere in the list. */);
+ } while (entry->binlog_id != binlog_id);
+ mysql_mutex_unlock(&LOCK_xid_list);
+
+ ha_commit_checkpoint_request(entry, binlog_checkpoint_callback);
+ /*
+ When we rotated the binlog, we incremented xid_count to make sure the
+ entry would not go away until this point, where we have done all necessary
+ commit_checkpoint_request() calls.
+ So now we can (and must) decrease the count - when it reaches zero, we
+ will know that both all pending unlog() and all pending
+ commit_checkpoint_notify() calls are done, and we can log a new binlog
+ checkpoint.
+ */
+ mark_xid_done(binlog_id, true);
+}
+
+
/**
The method executes rotation when LOCK_log is already acquired
by the caller.
@@ -5359,6 +6192,15 @@ bool general_log_write(THD *thd, enum enum_server_command command,
@param check_purge is set to true if rotation took place
@note
+ Caller _must_ check the check_purge variable. If this is set, it means
+ that the binlog was rotated, and caller _must_ ensure that
+ do_checkpoint_request() is called later with the binlog_id of the rotated
+ binlog file. The call to do_checkpoint_request() must happen after
+ LOCK_log is released (which is why we cannot simply do it here).
+ Usually, checkpoint_and_purge() is appropriate, as it will both handle
+ the checkpointing and any needed purging of old logs.
+
+ @note
If rotation fails, for instance the server was unable
to create a new log file, we still try to write an
incident event to the current log.
@@ -5376,7 +6218,27 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge)
if (force_rotate || (my_b_tell(&log_file) >= (my_off_t) max_size))
{
+ ulong binlog_id= current_binlog_id;
+ /*
+ We rotate the binlog, so we need to start a commit checkpoint in all
+ supporting engines - when it finishes, we can log a new binlog checkpoint
+ event.
+
+ But we cannot start the checkpoint here - there could be a group commit
+ still in progress which needs to be included in the checkpoint, and
+ besides we do not want to do the (possibly expensive) checkpoint while
+ LOCK_log is held.
+
+ On the other hand, we must be sure that the xid_count entry for the
+ previous log does not go away until we start the checkpoint - which it
+ could do as it is no longer the most recent. So we increment xid_count
+ (to count the pending checkpoint request) - this will fix the entry in
+ place until we decrement again in do_checkpoint_request().
+ */
+ mark_xids_active(binlog_id, 1);
+
if ((error= new_file_without_locking()))
+ {
/**
Be conservative... There are possible lost events (eg,
failing to log the Execute_load_query_log_event
@@ -5389,7 +6251,14 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge)
if (!write_incident_already_locked(current_thd))
flush_and_sync(0);
- *check_purge= true;
+ /*
+ We failed to rotate - so we have to decrement the xid_count back that
+ we incremented before attempting the rotate.
+ */
+ mark_xid_done(binlog_id, false);
+ }
+ else
+ *check_purge= true;
}
DBUG_RETURN(error);
}
@@ -5417,6 +6286,13 @@ void MYSQL_BIN_LOG::purge()
#endif
}
+
+void MYSQL_BIN_LOG::checkpoint_and_purge(ulong binlog_id)
+{
+ do_checkpoint_request(binlog_id);
+ purge();
+}
+
/**
The method is a shortcut of @c rotate() and @c purge().
LOCK_log is acquired prior to rotate and is released after it.
@@ -5429,11 +6305,13 @@ void MYSQL_BIN_LOG::purge()
int MYSQL_BIN_LOG::rotate_and_purge(bool force_rotate)
{
int error= 0;
+ ulong prev_binlog_id;
DBUG_ENTER("MYSQL_BIN_LOG::rotate_and_purge");
bool check_purge= false;
//todo: fix the macro def and restore safe_mutex_assert_not_owner(&LOCK_log);
mysql_mutex_lock(&LOCK_log);
+ prev_binlog_id= current_binlog_id;
if ((error= rotate(force_rotate, &check_purge)))
check_purge= false;
/*
@@ -5443,7 +6321,7 @@ int MYSQL_BIN_LOG::rotate_and_purge(bool force_rotate)
mysql_mutex_unlock(&LOCK_log);
if (check_purge)
- purge();
+ checkpoint_and_purge(prev_binlog_id);
DBUG_RETURN(error);
}
@@ -5512,9 +6390,10 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
long val;
ulong end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t
uchar header[LOG_EVENT_HEADER_LEN];
- ha_checksum crc= 0, crc_0= 0; // assignments to keep compiler happy
+ ha_checksum crc= 0, crc_0= 0;
my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF);
uchar buf[BINLOG_CHECKSUM_LEN];
+ DBUG_ENTER("MYSQL_BIN_LOG::write_cache");
// while there is just one alg the following must hold:
DBUG_ASSERT(!do_checksum ||
@@ -5536,9 +6415,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
group= (uint)my_b_tell(&log_file);
hdr_offs= carry= 0;
- if (do_checksum)
- crc= crc_0= my_checksum(0L, NULL, 0);
-
+
do
{
/*
@@ -5567,7 +6444,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
/* write the first half of the split header */
if (my_b_write(&log_file, header, carry))
- return ER_ERROR_ON_WRITE;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
status_var_add(thd->status_var.binlog_bytes_written, carry);
/*
@@ -5611,12 +6488,12 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
crc= my_checksum(crc, cache->read_pos, length);
remains -= length;
if (my_b_write(&log_file, cache->read_pos, length))
- return ER_ERROR_ON_WRITE;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
if (remains == 0)
{
int4store(buf, crc);
if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
- return ER_ERROR_ON_WRITE;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
crc= crc_0;
}
}
@@ -5643,7 +6520,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
DBUG_ASSERT(remains == 0);
if (my_b_write(&log_file, cache->read_pos, hdr_offs) ||
my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
- return ER_ERROR_ON_WRITE;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
crc= crc_0;
}
}
@@ -5675,12 +6552,12 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
length, &crc);
if (my_b_write(&log_file, ev,
remains == 0 ? event_len : length - hdr_offs))
- return ER_ERROR_ON_WRITE;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
if (remains == 0)
{
int4store(buf, crc);
if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
- return ER_ERROR_ON_WRITE;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
crc= crc_0; // crc is complete
}
}
@@ -5705,10 +6582,10 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
/* Write data to the binary log file */
DBUG_EXECUTE_IF("fail_binlog_write_1",
- errno= 28; return ER_ERROR_ON_WRITE;);
+ errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE););
if (!do_checksum)
if (my_b_write(&log_file, cache->read_pos, length))
- return ER_ERROR_ON_WRITE;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
status_var_add(thd->status_var.binlog_bytes_written, length);
cache->read_pos=cache->read_end; // Mark buffer used up
@@ -5718,7 +6595,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
DBUG_ASSERT(!do_checksum || remains == 0);
DBUG_ASSERT(!do_checksum || crc == crc_0);
- return 0; // All OK
+ DBUG_RETURN(0); // All OK
}
/*
@@ -5730,9 +6607,9 @@ int query_error_code(THD *thd, bool not_killed)
if (not_killed || (killed_mask_hard(thd->killed) == KILL_BAD_DATA))
{
- error= thd->is_error() ? thd->stmt_da->sql_errno() : 0;
+ error= thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0;
- /* thd->stmt_da->sql_errno() might be ER_SERVER_SHUTDOWN or
+ /* thd->get_get_stmt_da()->sql_errno() might be ER_SERVER_SHUTDOWN or
ER_QUERY_INTERRUPTED, So here we need to make sure that error
is not set to these errors when specified not_killed by the
caller.
@@ -5774,11 +6651,13 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd)
uint error= 0;
my_off_t offset;
bool check_purge= false;
+ ulong prev_binlog_id;
DBUG_ENTER("MYSQL_BIN_LOG::write_incident");
mysql_mutex_lock(&LOCK_log);
if (likely(is_open()))
{
+ prev_binlog_id= current_binlog_id;
if (!(error= write_incident_already_locked(thd)) &&
!(error= flush_and_sync(0)))
{
@@ -5798,7 +6677,7 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd)
mysql_mutex_unlock(&LOCK_log);
if (check_purge)
- purge();
+ checkpoint_and_purge(prev_binlog_id);
}
else
{
@@ -5808,6 +6687,45 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd)
DBUG_RETURN(error);
}
+void
+MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name,
+ uint len)
+{
+ my_off_t offset;
+ Binlog_checkpoint_log_event ev(name, len);
+ /*
+ Note that we must sync the binlog checkpoint to disk.
+ Otherwise a subsequent log purge could delete binlogs that XA recovery
+ thinks are needed (even though they are not really).
+ */
+ if (!ev.write(&log_file) && !flush_and_sync(0))
+ {
+ signal_update();
+ }
+ else
+ {
+ /*
+ If we fail to write the checkpoint event, something is probably really
+ bad with the binlog. We complain in the error log.
+
+ Note that failure to write binlog checkpoint does not compromise the
+ ability to do crash recovery - crash recovery will just have to scan a
+ bit more of the binlog than strictly necessary.
+ */
+ sql_print_error("Failed to write binlog checkpoint event to binary log\n");
+ }
+
+ offset= my_b_tell(&log_file);
+ /*
+ Take mutex to protect against a reader seeing partial writes of 64-bit
+ offset on 32-bit CPUs.
+ */
+ mysql_mutex_lock(&LOCK_commit_ordered);
+ last_commit_pos_offset= offset;
+ mysql_mutex_unlock(&LOCK_commit_ordered);
+}
+
+
/**
Write a cached log entry to the binary log.
- To support transaction over replication, we wrap the transaction
@@ -5840,6 +6758,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
bool using_trx_cache)
{
group_commit_entry entry;
+ Ha_trx_info *ha_info;
DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog");
entry.thd= thd;
@@ -5848,20 +6767,16 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
entry.all= all;
entry.using_stmt_cache= using_stmt_cache;
entry.using_trx_cache= using_trx_cache;
+ entry.need_unlog= false;
+ ha_info= all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list;
+ for (; ha_info; ha_info= ha_info->next())
+ {
+ if (ha_info->is_started() && ha_info->ht() != binlog_hton &&
+ !ha_info->ht()->commit_checkpoint_request)
+ entry.need_unlog= true;
+ break;
+ }
- /*
- Log "BEGIN" at the beginning of every transaction. Here, a transaction is
- either a BEGIN..COMMIT block or a single statement in autocommit mode.
-
- Create the necessary events here, where we have the correct THD (and
- thread context).
-
- Due to group commit the actual writing to binlog may happen in a different
- thread.
- */
- Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), using_trx_cache, TRUE,
- TRUE, 0);
- entry.begin_event= &qinfo;
entry.end_event= end_ev;
if (cache_mngr->stmt_cache.has_incident() ||
cache_mngr->trx_cache.has_incident())
@@ -5877,45 +6792,343 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
}
}
-bool
-MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
+
+/*
+ Put a transaction that is ready to commit in the group commit queue.
+ The transaction is identified by the ENTRY object passed into this function.
+
+ To facilitate group commit for the binlog, we first queue up ourselves in
+ this function. Then later the first thread to enter the queue waits for
+ the LOCK_log mutex, and commits for everyone in the queue once it gets the
+ lock. Any other threads in the queue just wait for the first one to finish
+ the commit and wake them up. This way, all transactions in the queue get
+ committed in a single disk operation.
+
+ The main work in this function is when the commit in one transaction has
+ been marked to wait for the commit of another transaction to happen
+ first. This is used to support in-order parallel replication, where
+ transactions can execute out-of-order but need to be committed in-order with
+ how they happened on the master. The waiting of one commit on another needs
+ to be integrated with the group commit queue, to ensure that the waiting
+ transaction can participate in the same group commit as the waited-for
+ transaction.
+
+ So when we put a transaction in the queue, we check if there were other
+ transactions already prepared to commit but just waiting for the first one
+ to commit. If so, we add those to the queue as well, transitively for all
+ waiters.
+
+ And if a transaction is marked to wait for a prior transaction, but that
+ prior transaction is already queued for group commit, then we can queue the
+ new transaction directly to participate in the group commit.
+
+ @retval < 0 Error
+ @retval > 0 If queued as the first entry in the queue (meaning this
+ is the leader)
+ @retval 0 Otherwise (queued as participant, leader handles the commit)
+*/
+
+int
+MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
{
+ group_commit_entry *entry, *orig_queue, *last;
+ wait_for_commit *cur;
+ wait_for_commit *wfc;
+ DBUG_ENTER("MYSQL_BIN_LOG::queue_for_group_commit");
+
+ /*
+ Check if we need to wait for another transaction to commit before us.
+
+ It is safe to do a quick check without lock first in the case where we do
+ not have to wait. But if the quick check shows we need to wait, we must do
+ another safe check under lock, to avoid the race where the other
+ transaction wakes us up between the check and the wait.
+ */
+ wfc= orig_entry->thd->wait_for_commit_ptr;
+ orig_entry->queued_by_other= false;
+ if (wfc && wfc->waitee)
+ {
+ mysql_mutex_lock(&wfc->LOCK_wait_commit);
+ /*
+ Do an extra check here, this time safely under lock.
+
+ If waitee->commit_started is set, it means that the transaction we need
+ to wait for has already queued up for group commit. In this case it is
+ safe for us to queue up immediately as well, increasing the opprtunities
+ for group commit. Because waitee has taken the LOCK_prepare_ordered
+ before setting the flag, so there is no risk that we can queue ahead of
+ it.
+ */
+ if (wfc->waitee && !wfc->waitee->commit_started)
+ {
+ PSI_stage_info old_stage;
+ wait_for_commit *loc_waitee;
+
+ /*
+ By setting wfc->opaque_pointer to our own entry, we mark that we are
+ ready to commit, but waiting for another transaction to commit before
+ us.
+
+ This other transaction may then take over the commit process for us to
+ get us included in its own group commit. If this happens, the
+ queued_by_other flag is set.
+
+ Setting this flag may or may not be seen by the other thread, but we
+ are safe in any case: The other thread will set queued_by_other under
+ its LOCK_wait_commit, and we will not check queued_by_other only after
+ we have been woken up.
+ */
+ wfc->opaque_pointer= orig_entry;
+ DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
+ orig_entry->thd->ENTER_COND(&wfc->COND_wait_commit,
+ &wfc->LOCK_wait_commit,
+ &stage_waiting_for_prior_transaction_to_commit,
+ &old_stage);
+ while ((loc_waitee= wfc->waitee) && !orig_entry->thd->check_killed())
+ mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
+ wfc->opaque_pointer= NULL;
+ DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d",
+ orig_entry->queued_by_other));
+
+ if (loc_waitee)
+ {
+ /* Wait terminated due to kill. */
+ mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
+ if (loc_waitee->wakeup_subsequent_commits_running ||
+ orig_entry->queued_by_other)
+ {
+ /* Our waitee is already waking us up, so ignore the kill. */
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ do
+ {
+ mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
+ } while (wfc->waitee);
+ }
+ else
+ {
+ /* We were killed, so remove us from the list of waitee. */
+ wfc->remove_from_list(&loc_waitee->subsequent_commits_list);
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ wfc->waitee= NULL;
+
+ orig_entry->thd->EXIT_COND(&old_stage);
+ /* Interrupted by kill. */
+ DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior_killed");
+ wfc->wakeup_error= orig_entry->thd->killed_errno();
+ if (!wfc->wakeup_error)
+ wfc->wakeup_error= ER_QUERY_INTERRUPTED;
+ my_message(wfc->wakeup_error, ER(wfc->wakeup_error), MYF(0));
+ DBUG_RETURN(-1);
+ }
+ }
+ orig_entry->thd->EXIT_COND(&old_stage);
+ }
+ else
+ mysql_mutex_unlock(&wfc->LOCK_wait_commit);
+ }
/*
- To facilitate group commit for the binlog, we first queue up ourselves in
- the group commit queue. Then the first thread to enter the queue waits for
- the LOCK_log mutex, and commits for everyone in the queue once it gets the
- lock. Any other threads in the queue just wait for the first one to finish
- the commit and wake them up.
+ If the transaction we were waiting for has already put us into the group
+ commit queue (and possibly already done the entire binlog commit for us),
+ then there is nothing else to do.
*/
+ if (orig_entry->queued_by_other)
+ DBUG_RETURN(0);
+
+ if (wfc && wfc->wakeup_error)
+ {
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
+ DBUG_RETURN(-1);
+ }
- entry->thd->clear_wakeup_ready();
+ /* Now enqueue ourselves in the group commit queue. */
+ DEBUG_SYNC(orig_entry->thd, "commit_before_enqueue");
+ orig_entry->thd->clear_wakeup_ready();
mysql_mutex_lock(&LOCK_prepare_ordered);
- group_commit_entry *orig_queue= group_commit_queue;
- entry->next= orig_queue;
- group_commit_queue= entry;
+ orig_queue= group_commit_queue;
+
+ /*
+ Iteratively process everything added to the queue, looking for waiters,
+ and their waiters, and so on. If a waiter is ready to commit, we
+ immediately add it to the queue, and mark it as queued_by_other.
+
+ This would be natural to do with recursion, but we want to avoid
+ potentially unbounded recursion blowing the C stack, so we use the list
+ approach instead.
+
+ We keep a list of the group_commit_entry of all the waiters that need to
+ be processed. Initially this list contains only the entry passed into this
+ function.
+
+ We process entries in the list one by one. The element currently being
+ processed is pointed to by `entry`, and the element at the end of the list
+ is pointed to by `last` (we do not use NULL to terminate the list).
+
+ As we process an entry, any waiters for that entry are added at the end of
+ the list, to be processed in subsequent iterations. The the entry is added
+ to the group_commit_queue. This continues until the list is exhausted,
+ with all entries ever added eventually processed.
+
+ The end result is a breath-first traversal of the tree of waiters,
+ re-using the `next' pointers of the group_commit_entry objects in place of
+ extra stack space in a recursive traversal.
+
+ The temporary list linked through these `next' pointers is not used by the
+ caller or any other function; it only exists while doing the iterative
+ tree traversal. After, all the processed entries are linked into the
+ group_commit_queue.
+ */
- if (entry->cache_mngr->using_xa)
+ cur= wfc;
+ last= orig_entry;
+ entry= orig_entry;
+ for (;;)
{
- DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered");
- run_prepare_ordered(entry->thd, entry->all);
- DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered");
+ group_commit_entry *next_entry;
+
+ if (entry->cache_mngr->using_xa)
+ {
+ DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered");
+ run_prepare_ordered(entry->thd, entry->all);
+ DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered");
+ }
+
+ if (cur)
+ {
+ /*
+ Now that we have taken LOCK_prepare_ordered and will queue up in the
+ group commit queue, it is safe for following transactions to queue
+ themselves. We will grab here any transaction that is now ready to
+ queue up, but after that, more transactions may become ready while the
+ leader is waiting to start the group commit. So set the flag
+ `commit_started', so that later transactions can still participate in
+ the group commit..
+ */
+ cur->commit_started= true;
+
+ /*
+ Check if this transaction has other transaction waiting for it to
+ commit.
+
+ If so, process the waiting transactions, and their waiters and so on,
+ transitively.
+ */
+ if (cur->subsequent_commits_list)
+ {
+ wait_for_commit *waiter, **waiter_ptr;
+
+ mysql_mutex_lock(&cur->LOCK_wait_commit);
+ /*
+ Grab the list, now safely under lock, and process it if still
+ non-empty.
+ */
+ waiter= cur->subsequent_commits_list;
+ waiter_ptr= &cur->subsequent_commits_list;
+ while (waiter)
+ {
+ wait_for_commit *next_waiter= waiter->next_subsequent_commit;
+ group_commit_entry *entry2=
+ (group_commit_entry *)waiter->opaque_pointer;
+ if (entry2)
+ {
+ /*
+ This is another transaction ready to be written to the binary
+ log. We can put it into the queue directly, without needing a
+ separate context switch to the other thread. We just set a flag
+ so that the other thread will know when it wakes up that it was
+ already processed.
+
+ So remove it from the list of our waiters, and instead put it at
+ the end of the list to be processed in a subsequent iteration of
+ the outer loop.
+ */
+ *waiter_ptr= next_waiter;
+ entry2->queued_by_other= true;
+ last->next= entry2;
+ last= entry2;
+ /*
+ As a small optimisation, we do not actually need to set
+ entry2->next to NULL, as we can use the pointer `last' to check
+ for end-of-list.
+ */
+ }
+ else
+ {
+ /*
+ This transaction is not ready to participate in the group commit
+ yet, so leave it in the waiter list. It might join the group
+ commit later, if it completes soon enough to do so (it will see
+ our wfc->commit_started flag set), or it might commit later in a
+ later group commit.
+ */
+ waiter_ptr= &waiter->next_subsequent_commit;
+ }
+ waiter= next_waiter;
+ }
+ mysql_mutex_unlock(&cur->LOCK_wait_commit);
+ }
+ }
+
+ /*
+ Handle the heuristics that if another transaction is waiting for this
+ transaction (or if it does so later), then we want to trigger group
+ commit immediately, without waiting for the binlog_commit_wait_usec
+ timeout to expire.
+ */
+ entry->thd->waiting_on_group_commit= true;
+
+ /* Add the entry to the group commit queue. */
+ next_entry= entry->next;
+ entry->next= group_commit_queue;
+ group_commit_queue= entry;
+ if (entry == last)
+ break;
+ /*
+ Move to the next entry in the flattened list of waiting transactions
+ that still need to be processed transitively.
+ */
+ entry= next_entry;
+ DBUG_ASSERT(entry != NULL);
+ cur= entry->thd->wait_for_commit_ptr;
}
+
+ if (opt_binlog_commit_wait_count > 0 && orig_queue != NULL)
+ mysql_cond_signal(&COND_prepare_ordered);
mysql_mutex_unlock(&LOCK_prepare_ordered);
- DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered");
+ DEBUG_SYNC(orig_entry->thd, "commit_after_release_LOCK_prepare_ordered");
+
+ DBUG_PRINT("info", ("Queued for group commit as %s\n",
+ (orig_queue == NULL) ? "leader" : "participant"));
+ DBUG_RETURN(orig_queue == NULL);
+}
+
+bool
+MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
+{
+ int is_leader= queue_for_group_commit(entry);
/*
- The first in the queue handle group commit for all; the others just wait
+ The first in the queue handles group commit for all; the others just wait
to be signalled when group commit is done.
*/
- if (orig_queue != NULL)
+ if (is_leader < 0)
+ return true; /* Error */
+ else if (is_leader)
+ trx_group_commit_leader(entry);
+ else if (!entry->queued_by_other)
entry->thd->wait_for_wakeup_ready();
else
- trx_group_commit_leader(entry);
+ {
+ /*
+ If we were queued by another prior commit, then we are woken up
+ only when the leader has already completed the commit for us.
+ So nothing to do here then.
+ */
+ }
if (!opt_optimize_thread_scheduling)
{
/* For the leader, trx_group_commit_leader() already took the lock. */
- if (orig_queue != NULL)
+ if (!is_leader)
mysql_mutex_lock(&LOCK_commit_ordered);
DEBUG_SYNC(entry->thd, "commit_loop_entry_commit_ordered");
@@ -5931,15 +7144,41 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
DEBUG_SYNC(entry->thd, "commit_after_group_run_commit_ordered");
}
mysql_mutex_unlock(&LOCK_commit_ordered);
+ entry->thd->wakeup_subsequent_commits(entry->error);
if (next)
{
- next->thd->signal_wakeup_ready();
+ /*
+ Wake up the next thread in the group commit.
+
+ The next thread can be waiting in two different ways, depending on
+ whether it put itself in the queue, or if it was put in queue by us
+ because it had to wait for us to commit first.
+
+ So execute the appropriate wakeup, identified by the queued_by_other
+ field.
+ */
+ if (next->queued_by_other)
+ next->thd->wait_for_commit_ptr->wakeup(entry->error);
+ else
+ next->thd->signal_wakeup_ready();
+ }
+ else
+ {
+ /*
+ If we rotated the binlog, and if we are using the unoptimized thread
+ scheduling where every thread runs its own commit_ordered(), then we
+ must do the commit checkpoint and log purge here, after all
+ commit_ordered() calls have finished, and locks have been released.
+ */
+ if (entry->check_purge)
+ checkpoint_and_purge(entry->binlog_id);
}
+
}
if (likely(!entry->error))
- return 0;
+ return entry->thd->wait_for_prior_commit();
switch (entry->error)
{
@@ -5966,8 +7205,9 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
we need to mark it as not needed for recovery (unlog() is not called
for a transaction if log_xid() fails).
*/
- if (entry->cache_mngr->using_xa && entry->cache_mngr->xa_xid)
- mark_xid_done();
+ if (entry->cache_mngr->using_xa && entry->cache_mngr->xa_xid &&
+ entry->cache_mngr->need_unlog)
+ mark_xid_done(entry->cache_mngr->binlog_id, true);
return 1;
}
@@ -5987,32 +7227,49 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
{
uint xid_count= 0;
my_off_t UNINIT_VAR(commit_offset);
- group_commit_entry *current;
- group_commit_entry *last_in_queue;
+ group_commit_entry *current, *last_in_queue;
group_commit_entry *queue= NULL;
bool check_purge= false;
+ ulong binlog_id;
+ uint64 commit_id;
DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader");
+ LINT_INIT(binlog_id);
- DBUG_ASSERT(is_open());
- if (likely(is_open())) // Should always be true
{
+ DBUG_EXECUTE_IF("inject_binlog_commit_before_get_LOCK_log",
+ DBUG_ASSERT(!debug_sync_set_action(leader->thd, STRING_WITH_LEN
+ ("commit_before_get_LOCK_log SIGNAL waiting WAIT_FOR cont TIMEOUT 1")));
+ );
/*
Lock the LOCK_log(), and once we get it, collect any additional writes
that queued up while we were waiting.
*/
+ DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_log");
mysql_mutex_lock(&LOCK_log);
DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log");
mysql_mutex_lock(&LOCK_prepare_ordered);
+ if (opt_binlog_commit_wait_count)
+ wait_for_sufficient_commits();
+ /*
+ Note that wait_for_sufficient_commits() may have released and
+ re-acquired the LOCK_log and LOCK_prepare_ordered if it needed to wait.
+ */
current= group_commit_queue;
group_commit_queue= NULL;
mysql_mutex_unlock(&LOCK_prepare_ordered);
+ binlog_id= current_binlog_id;
/* As the queue is in reverse order of entering, reverse it. */
last_in_queue= current;
while (current)
{
group_commit_entry *next= current->next;
+ /*
+ Now that group commit is started, we can clear the flag; there is no
+ longer any use in waiters on this commit trying to trigger it early.
+ */
+ current->thd->waiting_on_group_commit= false;
current->next= queue;
queue= current;
current= next;
@@ -6020,7 +7277,21 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
DBUG_ASSERT(leader == queue /* the leader should be first in queue */);
/* Now we have in queue the list of transactions to be committed in order. */
+ }
+ DBUG_ASSERT(is_open());
+ if (likely(is_open())) // Should always be true
+ {
+ commit_id= (last_in_queue == leader ? 0 : (uint64)leader->thd->query_id);
+ DBUG_EXECUTE_IF("binlog_force_commit_id",
+ {
+ const LEX_STRING name= { C_STRING_WITH_LEN("commit_id") };
+ bool null_value;
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&leader->thd->user_vars,
+ (uchar*) name.str, name.length);
+ commit_id= entry->val_int(&null_value);
+ });
/*
Commit every transaction in the queue.
@@ -6041,13 +7312,31 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
*/
DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty());
- current->error= write_transaction_or_stmt(current);
+ if ((current->error= write_transaction_or_stmt(current, commit_id)))
+ current->commit_errno= errno;
strmake_buf(cache_mngr->last_commit_pos_file, log_file_name);
commit_offset= my_b_write_tell(&log_file);
cache_mngr->last_commit_pos_offset= commit_offset;
if (cache_mngr->using_xa && cache_mngr->xa_xid)
- xid_count++;
+ {
+ /*
+ If all storage engines support commit_checkpoint_request(), then we
+ do not need to keep track of when this XID is durably committed.
+ Instead we will just ask the storage engine to durably commit all its
+ XIDs when we rotate a binlog file.
+ */
+ if (current->need_unlog)
+ {
+ xid_count++;
+ cache_mngr->need_unlog= true;
+ cache_mngr->binlog_id= binlog_id;
+ }
+ else
+ cache_mngr->need_unlog= false;
+
+ cache_mngr->delayed_error= false;
+ }
}
bool synced= 0;
@@ -6090,33 +7379,37 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
}
/*
- if any commit_events are Xid_log_event, increase the number of
- prepared_xids (it's decreased in ::unlog()). Binlog cannot be rotated
- if there're prepared xids in it - see the comment in new_file() for
- an explanation.
- If no Xid_log_events (then it's all Query_log_event) rotate binlog,
- if necessary.
+ If any commit_events are Xid_log_event, increase the number of pending
+ XIDs in current binlog (it's decreased in ::unlog()). When the count in
+ a (not active) binlog file reaches zero, we know that it is no longer
+ needed in XA recovery, and we can log a new binlog checkpoint event.
*/
if (xid_count > 0)
{
- mark_xids_active(xid_count);
+ mark_xids_active(binlog_id, xid_count);
}
- else
+
+ if (rotate(false, &check_purge))
{
- if (rotate(false, &check_purge))
- {
- /*
- If we fail to rotate, which thread should get the error?
- We give the error to the *last* transaction thread; that seems to
- make the most sense, as it was the last to write to the log.
- */
- last_in_queue->error= ER_ERROR_ON_WRITE;
- last_in_queue->commit_errno= errno;
- check_purge= false;
- }
- /* In case of binlog rotate, update the correct current binlog offset. */
- commit_offset= my_b_write_tell(&log_file);
+ /*
+ If we fail to rotate, which thread should get the error?
+ We give the error to the leader, as any my_error() thrown inside
+ rotate() will have been registered for the leader THD.
+
+ However we must not return error from here - that would cause
+ ha_commit_trans() to abort and rollback the transaction, which would
+ leave an inconsistent state with the transaction committed in the
+ binlog but rolled back in the engine.
+
+ Instead set a flag so that we can return error later, from unlog(),
+ when the transaction has been safely committed in the engine.
+ */
+ leader->cache_mngr->delayed_error= true;
+ my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, errno);
+ check_purge= false;
}
+ /* In case of binlog rotate, update the correct current binlog offset. */
+ commit_offset= my_b_write_tell(&log_file);
}
DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered");
@@ -6130,9 +7423,6 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
*/
mysql_mutex_unlock(&LOCK_log);
- if (check_purge)
- purge();
-
DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log");
++num_group_commits;
@@ -6150,6 +7440,15 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
mysql_cond_wait(&COND_queue_busy, &LOCK_commit_ordered);
group_commit_queue_busy= TRUE;
+ /*
+ Set these so parent can run checkpoint_and_purge() in last thread.
+ (When using optimized thread scheduling, we run checkpoint_and_purge()
+ in this function, so parent does not need to and we need not set these
+ values).
+ */
+ last_in_queue->check_purge= check_purge;
+ last_in_queue->binlog_id= binlog_id;
+
/* Note that we return with LOCK_commit_ordered locked! */
DBUG_VOID_RETURN;
}
@@ -6165,8 +7464,10 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
DEBUG_SYNC(leader->thd, "commit_loop_entry_commit_ordered");
++num_commits;
- if (current->cache_mngr->using_xa && !current->error)
+ if (current->cache_mngr->using_xa && !current->error &&
+ DBUG_EVALUATE_IF("skip_commit_ordered", 0, 1))
run_commit_ordered(current->thd, current->all);
+ current->thd->wakeup_subsequent_commits(current->error);
/*
Careful not to access current->next after waking up the other thread! As
@@ -6174,32 +7475,40 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
*/
next= current->next;
if (current != leader) // Don't wake up ourself
- current->thd->signal_wakeup_ready();
+ {
+ if (current->queued_by_other)
+ current->thd->wait_for_commit_ptr->wakeup(current->error);
+ else
+ current->thd->signal_wakeup_ready();
+ }
current= next;
}
DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered");
mysql_mutex_unlock(&LOCK_commit_ordered);
+ DEBUG_SYNC(leader->thd, "commit_after_group_release_commit_ordered");
+
+ if (check_purge)
+ checkpoint_and_purge(binlog_id);
DBUG_VOID_RETURN;
}
int
-MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry)
+MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
+ uint64 commit_id)
{
binlog_cache_mngr *mngr= entry->cache_mngr;
+ DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_or_stmt");
- if (entry->begin_event->write(&log_file))
- return ER_ERROR_ON_WRITE;
- status_var_add(entry->thd->status_var.binlog_bytes_written,
- entry->begin_event->data_written);
+ if (write_gtid_event(entry->thd, false, entry->using_trx_cache, commit_id))
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
if (entry->using_stmt_cache && !mngr->stmt_cache.empty() &&
write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE)))
{
entry->error_cache= &mngr->stmt_cache.cache_log;
- entry->commit_errno= errno;
- return ER_ERROR_ON_WRITE;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
}
if (entry->using_trx_cache && !mngr->trx_cache.empty())
@@ -6219,16 +7528,21 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry)
if (write_cache(entry->thd, mngr->get_binlog_cache_log(TRUE)))
{
entry->error_cache= &mngr->trx_cache.cache_log;
- entry->commit_errno= errno;
- return ER_ERROR_ON_WRITE;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
}
}
+ DBUG_EXECUTE_IF("inject_error_writing_xid",
+ {
+ entry->error_cache= NULL;
+ errno= 28;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
+ });
+
if (entry->end_event->write(&log_file))
{
entry->error_cache= NULL;
- entry->commit_errno= errno;
- return ER_ERROR_ON_WRITE;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
}
status_var_add(entry->thd->status_var.binlog_bytes_written,
entry->end_event->data_written);
@@ -6238,27 +7552,156 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry)
if (entry->incident_event->write(&log_file))
{
entry->error_cache= NULL;
- entry->commit_errno= errno;
- return ER_ERROR_ON_WRITE;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
}
}
if (mngr->get_binlog_cache_log(FALSE)->error) // Error on read
{
entry->error_cache= &mngr->stmt_cache.cache_log;
- entry->commit_errno= errno;
- return ER_ERROR_ON_READ;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
}
if (mngr->get_binlog_cache_log(TRUE)->error) // Error on read
{
entry->error_cache= &mngr->trx_cache.cache_log;
- entry->commit_errno= errno;
- return ER_ERROR_ON_READ;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
}
- return 0;
+ DBUG_RETURN(0);
}
+
+/*
+ Wait for sufficient commits to queue up for group commit, according to the
+ values of binlog_commit_wait_count and binlog_commit_wait_usec.
+
+ Note that this function may release and re-acquire LOCK_log and
+ LOCK_prepare_ordered if it needs to wait.
+*/
+
+void
+MYSQL_BIN_LOG::wait_for_sufficient_commits()
+{
+ size_t count;
+ group_commit_entry *e;
+ group_commit_entry *last_head;
+ struct timespec wait_until;
+
+ mysql_mutex_assert_owner(&LOCK_log);
+ mysql_mutex_assert_owner(&LOCK_prepare_ordered);
+
+ for (e= last_head= group_commit_queue, count= 0; e; e= e->next)
+ {
+ if (++count >= opt_binlog_commit_wait_count)
+ {
+ group_commit_trigger_count++;
+ return;
+ }
+ if (unlikely(e->thd->has_waiter))
+ {
+ group_commit_trigger_lock_wait++;
+ return;
+ }
+ }
+
+ mysql_mutex_unlock(&LOCK_log);
+ set_timespec_nsec(wait_until, (ulonglong)1000*opt_binlog_commit_wait_usec);
+
+ for (;;)
+ {
+ int err;
+ group_commit_entry *head;
+
+ err= mysql_cond_timedwait(&COND_prepare_ordered, &LOCK_prepare_ordered,
+ &wait_until);
+ if (err == ETIMEDOUT)
+ {
+ group_commit_trigger_timeout++;
+ break;
+ }
+ if (unlikely(last_head->thd->has_waiter))
+ {
+ group_commit_trigger_lock_wait++;
+ break;
+ }
+ head= group_commit_queue;
+ for (e= head; e && e != last_head; e= e->next)
+ {
+ ++count;
+ if (unlikely(e->thd->has_waiter))
+ {
+ group_commit_trigger_lock_wait++;
+ goto after_loop;
+ }
+ }
+ if (count >= opt_binlog_commit_wait_count)
+ {
+ group_commit_trigger_count++;
+ break;
+ }
+ last_head= head;
+ }
+after_loop:
+
+ /*
+ We must not wait for LOCK_log while holding LOCK_prepare_ordered.
+ LOCK_log can be held for long periods (eg. we do I/O under it), while
+ LOCK_prepare_ordered must only be held for short periods.
+
+ In addition, waiting for LOCK_log while holding LOCK_prepare_ordered would
+ violate locking order of LOCK_log-before-LOCK_prepare_ordered. This could
+ cause SAFEMUTEX warnings (even if it cannot actually deadlock with current
+ code, as there can be at most one group commit leader thread at a time).
+
+ So release and re-acquire LOCK_prepare_ordered if we need to wait for the
+ LOCK_log.
+ */
+ if (mysql_mutex_trylock(&LOCK_log))
+ {
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
+ mysql_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_prepare_ordered);
+ }
+}
+
+
+void
+MYSQL_BIN_LOG::binlog_trigger_immediate_group_commit()
+{
+ group_commit_entry *head;
+ mysql_mutex_assert_owner(&LOCK_prepare_ordered);
+ head= group_commit_queue;
+ if (head)
+ {
+ head->thd->has_waiter= true;
+ mysql_cond_signal(&COND_prepare_ordered);
+ }
+}
+
+
+/*
+ This function is called when a transaction T1 goes to wait for another
+ transaction T2. It is used to cut short any binlog group commit delay from
+ --binlog-commit-wait-count in the case where another transaction is stalled
+ on the wait due to conflicting row locks.
+
+ If T2 is already ready to group commit, any waiting group commit will be
+ signalled to proceed immediately. Otherwise, a flag will be set in T2, and
+ when T2 later becomes ready, immediate group commit will be triggered.
+*/
+void
+binlog_report_wait_for(THD *thd1, THD *thd2)
+{
+ if (opt_binlog_commit_wait_count == 0)
+ return;
+ mysql_mutex_lock(&LOCK_prepare_ordered);
+ thd2->has_waiter= true;
+ if (thd2->waiting_on_group_commit)
+ mysql_bin_log.binlog_trigger_immediate_group_commit();
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
+}
+
+
/**
Wait until we get a signal that the relay log has been updated.
@@ -6272,15 +7715,14 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry)
void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd)
{
- const char *old_msg;
+ PSI_stage_info old_stage;
DBUG_ENTER("wait_for_update_relay_log");
- old_msg= thd->enter_cond(&update_cond, &LOCK_log,
- "Slave has read all relay log; "
- "waiting for the slave I/O "
- "thread to update it" );
+ thd->ENTER_COND(&update_cond, &LOCK_log,
+ &stage_slave_has_read_all_relay_log,
+ &old_stage);
mysql_cond_wait(&update_cond, &LOCK_log);
- thd->exit_cond(old_msg);
+ thd->EXIT_COND(&old_stage);
DBUG_VOID_RETURN;
}
@@ -6335,12 +7777,16 @@ int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd,
void MYSQL_BIN_LOG::close(uint exiting)
{ // One can't set log_type here!
+ bool failed_to_save_state= false;
DBUG_ENTER("MYSQL_BIN_LOG::close");
DBUG_PRINT("enter",("exiting: %d", (int) exiting));
+
+ mysql_mutex_assert_owner(&LOCK_log);
+
if (log_state == LOG_OPENED)
{
#ifdef HAVE_REPLICATION
- if (log_type == LOG_BIN && !no_auto_events &&
+ if (log_type == LOG_BIN &&
(exiting & LOG_CLOSE_STOP_EVENT))
{
Stop_log_event s;
@@ -6352,6 +7798,27 @@ void MYSQL_BIN_LOG::close(uint exiting)
s.write(&log_file);
bytes_written+= s.data_written;
signal_update();
+
+ /*
+ When we shut down server, write out the binlog state to a separate
+ file so we do not have to scan an entire binlog file to recover it
+ at next server start.
+
+ Note that this must be written and synced to disk before marking the
+ last binlog file as "not crashed".
+ */
+ if (!is_relay_log && write_state_to_file())
+ {
+ sql_print_error("Failed to save binlog GTID state during shutdown. "
+ "Binlog will be marked as crashed, so that crash "
+ "recovery can recover the state at next server "
+ "startup.");
+ /*
+ Leave binlog file marked as crashed, so we can recover state by
+ scanning it now that we failed to write out the state properly.
+ */
+ failed_to_save_state= true;
+ }
}
#endif /* HAVE_REPLICATION */
@@ -6360,7 +7827,8 @@ void MYSQL_BIN_LOG::close(uint exiting)
&& !(exiting & LOG_CLOSE_DELAYED_CLOSE))
{
my_off_t org_position= mysql_file_tell(log_file.file, MYF(0));
- clear_inuse_flag_when_closing(log_file.file);
+ if (!failed_to_save_state)
+ clear_inuse_flag_when_closing(log_file.file);
/*
Restore position so that anything we have in the IO_cache is written
to the correct position.
@@ -6564,7 +8032,7 @@ static void print_buffer_to_nt_eventlog(enum loglevel level, char *buff,
DBUG_ENTER("print_buffer_to_nt_eventlog");
/* Add ending CR/LF's to string, overwrite last chars if necessary */
- strmov(buffptr+min(length, buffLen-5), "\r\n\r\n");
+ strmov(buffptr+MY_MIN(length, buffLen-5), "\r\n\r\n");
setup_windows_event_source();
if ((event= RegisterEventSource(NULL,"MySQL")))
@@ -6598,16 +8066,33 @@ static void print_buffer_to_file(enum loglevel level, const char *buffer,
time_t skr;
struct tm tm_tmp;
struct tm *start;
+ THD *thd;
+ int tag_length= 0;
+ char tag[NAME_LEN];
DBUG_ENTER("print_buffer_to_file");
DBUG_PRINT("enter",("buffer: %s", buffer));
+ if (mysqld_server_initialized && (thd= current_thd))
+ {
+ if (thd->connection_name.length)
+ {
+ /*
+ Add tag for slaves so that the user can see from which connection
+ the error originates.
+ */
+ tag_length= my_snprintf(tag, sizeof(tag), ER(ER_MASTER_LOG_PREFIX),
+ (int) thd->connection_name.length,
+ thd->connection_name.str);
+ }
+ }
+
mysql_mutex_lock(&LOCK_error_log);
skr= my_time(0);
localtime_r(&skr, &tm_tmp);
start=&tm_tmp;
- fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d [%s] %.*s\n",
+ fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d [%s] %.*s%.*s\n",
start->tm_year % 100,
start->tm_mon+1,
start->tm_mday,
@@ -6616,6 +8101,7 @@ static void print_buffer_to_file(enum loglevel level, const char *buffer,
start->tm_sec,
(level == ERROR_LEVEL ? "ERROR" : level == WARNING_LEVEL ?
"Warning" : "Note"),
+ tag_length, tag,
(int) length, buffer);
fflush(stderr);
@@ -6761,6 +8247,9 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
mysql_mutex_unlock(&LOCK_prepare_ordered);
}
+ if (thd->wait_for_prior_commit())
+ return 0;
+
cookie= 0;
if (xid)
cookie= log_one_transaction(xid);
@@ -6974,6 +8463,8 @@ int TC_LOG_MMAP::open(const char *opt_name)
mysql_mutex_init(key_LOCK_sync, &LOCK_sync, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_active, &LOCK_active, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_pool, &LOCK_pool, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_pending_checkpoint, &LOCK_pending_checkpoint,
+ MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_active, &COND_active, 0);
mysql_cond_init(key_COND_pool, &COND_pool, 0);
mysql_cond_init(key_TC_LOG_MMAP_COND_queue_busy, &COND_queue_busy, 0);
@@ -7224,17 +8715,93 @@ int TC_LOG_MMAP::sync()
return err;
}
+static void
+mmap_do_checkpoint_callback(void *data)
+{
+ TC_LOG_MMAP::pending_cookies *pending=
+ static_cast<TC_LOG_MMAP::pending_cookies *>(data);
+ ++pending->pending_count;
+}
+
+int TC_LOG_MMAP::unlog(ulong cookie, my_xid xid)
+{
+ pending_cookies *full_buffer= NULL;
+ DBUG_ASSERT(*(my_xid *)(data+cookie) == xid);
+
+ /*
+ Do not delete the entry immediately, as there may be participating storage
+ engines which implement commit_checkpoint_request(), and thus have not yet
+ flushed the commit durably to disk.
+
+ Instead put it in a queue - and periodically, we will request a checkpoint
+ from all engines and delete a whole batch at once.
+ */
+ mysql_mutex_lock(&LOCK_pending_checkpoint);
+ if (pending_checkpoint == NULL)
+ {
+ uint32 size= sizeof(*pending_checkpoint);
+ if (!(pending_checkpoint=
+ (pending_cookies *)my_malloc(size, MYF(MY_ZEROFILL))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), size);
+ mysql_mutex_unlock(&LOCK_pending_checkpoint);
+ return 1;
+ }
+ }
+
+ pending_checkpoint->cookies[pending_checkpoint->count++]= cookie;
+ if (pending_checkpoint->count == sizeof(pending_checkpoint->cookies) /
+ sizeof(pending_checkpoint->cookies[0]))
+ {
+ full_buffer= pending_checkpoint;
+ pending_checkpoint= NULL;
+ }
+ mysql_mutex_unlock(&LOCK_pending_checkpoint);
+
+ if (full_buffer)
+ {
+ /*
+ We do an extra increment and notify here - this ensures that
+ things work also if there are no engines at all that support
+ commit_checkpoint_request.
+ */
+ ++full_buffer->pending_count;
+ ha_commit_checkpoint_request(full_buffer, mmap_do_checkpoint_callback);
+ commit_checkpoint_notify(full_buffer);
+ }
+ return 0;
+}
+
+
+void
+TC_LOG_MMAP::commit_checkpoint_notify(void *cookie)
+{
+ uint count;
+ pending_cookies *pending= static_cast<pending_cookies *>(cookie);
+ mysql_mutex_lock(&LOCK_pending_checkpoint);
+ DBUG_ASSERT(pending->pending_count > 0);
+ count= --pending->pending_count;
+ mysql_mutex_unlock(&LOCK_pending_checkpoint);
+ if (count == 0)
+ {
+ uint i;
+ for (i= 0; i < sizeof(pending->cookies)/sizeof(pending->cookies[0]); ++i)
+ delete_entry(pending->cookies[i]);
+ my_free(pending);
+ }
+}
+
+
/**
erase xid from the page, update page free space counters/pointers.
cookie points directly to the memory where xid was logged.
*/
-int TC_LOG_MMAP::unlog(ulong cookie, my_xid xid)
+int TC_LOG_MMAP::delete_entry(ulong cookie)
{
PAGE *p=pages+(cookie/tc_log_page_size);
my_xid *x=(my_xid *)(data+cookie);
- DBUG_ASSERT(*x == xid);
DBUG_ASSERT(x >= p->start && x < p->end);
mysql_mutex_lock(&p->lock);
@@ -7258,6 +8825,7 @@ void TC_LOG_MMAP::close()
mysql_mutex_destroy(&LOCK_sync);
mysql_mutex_destroy(&LOCK_active);
mysql_mutex_destroy(&LOCK_pool);
+ mysql_mutex_destroy(&LOCK_pending_checkpoint);
mysql_cond_destroy(&COND_pool);
mysql_cond_destroy(&COND_active);
mysql_cond_destroy(&COND_queue_busy);
@@ -7285,9 +8853,12 @@ void TC_LOG_MMAP::close()
}
if (inited>=5) // cannot do in the switch because of Windows
mysql_file_delete(key_file_tclog, logname, MYF(MY_WME));
+ if (pending_checkpoint)
+ my_free(pending_checkpoint);
inited=0;
}
+
int TC_LOG_MMAP::recover()
{
HASH xids;
@@ -7373,26 +8944,13 @@ int TC_LOG::using_heuristic_recover()
/****** transaction coordinator log for 2pc - binlog() based solution ******/
#define TC_LOG_BINLOG MYSQL_BIN_LOG
-/**
- @todo
- keep in-memory list of prepared transactions
- (add to list in log(), remove on unlog())
- and copy it to the new binlog if rotated
- but let's check the behaviour of tc_log_page_waits first!
-*/
-
int TC_LOG_BINLOG::open(const char *opt_name)
{
- LOG_INFO log_info;
int error= 1;
DBUG_ASSERT(total_ha_2pc > 1);
DBUG_ASSERT(opt_name && opt_name[0]);
- mysql_mutex_init(key_BINLOG_LOCK_prep_xids,
- &LOCK_prep_xids, MY_MUTEX_INIT_FAST);
- mysql_cond_init(key_BINLOG_COND_prep_xids, &COND_prep_xids, 0);
-
if (!my_b_inited(&index_file))
{
/* There was a failure to open the index file, can't open the binlog */
@@ -7402,78 +8960,22 @@ int TC_LOG_BINLOG::open(const char *opt_name)
if (using_heuristic_recover())
{
+ mysql_mutex_lock(&LOCK_log);
/* generate a new binlog to mask a corrupted one */
- open(opt_name, LOG_BIN, 0, WRITE_CACHE, 0, max_binlog_size, 0, TRUE);
+ open(opt_name, LOG_BIN, 0, WRITE_CACHE, max_binlog_size, 0, TRUE);
+ mysql_mutex_unlock(&LOCK_log);
cleanup();
return 1;
}
- if ((error= find_log_pos(&log_info, NullS, 1)))
- {
- if (error != LOG_INFO_EOF)
- sql_print_error("find_log_pos() failed (error: %d)", error);
- else
- error= 0;
- goto err;
- }
-
- {
- const char *errmsg;
- IO_CACHE log;
- File file;
- Log_event *ev=0;
- Format_description_log_event fdle(BINLOG_VERSION);
- char log_name[FN_REFLEN];
-
- if (! fdle.is_valid())
- goto err;
-
- do
- {
- strmake_buf(log_name, log_info.log_file_name);
- } while (!(error= find_next_log(&log_info, 1)));
-
- if (error != LOG_INFO_EOF)
- {
- sql_print_error("find_log_pos() failed (error: %d)", error);
- goto err;
- }
-
- if ((file= open_binlog(&log, log_name, &errmsg)) < 0)
- {
- sql_print_error("%s", errmsg);
- goto err;
- }
-
- if ((ev= Log_event::read_log_event(&log, 0, &fdle,
- opt_master_verify_checksum)) &&
- ev->get_type_code() == FORMAT_DESCRIPTION_EVENT &&
- ev->flags & LOG_EVENT_BINLOG_IN_USE_F)
- {
- sql_print_information("Recovering after a crash using %s", opt_name);
- error= recover(&log, (Format_description_log_event *)ev);
- }
- else
- error=0;
-
- delete ev;
- end_io_cache(&log);
- mysql_file_close(file, MYF(MY_WME));
-
- if (error)
- goto err;
- }
-
-err:
+ error= do_binlog_recovery(opt_name, true);
+ binlog_state_recover_done= true;
return error;
}
/** This is called on shutdown, after ha_panic. */
void TC_LOG_BINLOG::close()
{
- DBUG_ASSERT(prepared_xids==0);
- mysql_mutex_destroy(&LOCK_prep_xids);
- mysql_cond_destroy(&COND_prep_xids);
}
/*
@@ -7498,7 +9000,17 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all,
DEBUG_SYNC(thd, "binlog_after_log_and_order");
- DBUG_RETURN(!err);
+ if (err)
+ DBUG_RETURN(0);
+ /*
+ If using explicit user XA, we will not have XID. We must still return a
+ non-zero cookie (as zero cookie signals error).
+ */
+ if (!xid || !cache_mngr->need_unlog)
+ DBUG_RETURN(BINLOG_COOKIE_DUMMY(cache_mngr->delayed_error));
+ else
+ DBUG_RETURN(BINLOG_COOKIE_MAKE(cache_mngr->binlog_id,
+ cache_mngr->delayed_error));
}
/*
@@ -7513,92 +9025,542 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all,
binary log.
*/
void
-TC_LOG_BINLOG::mark_xids_active(uint xid_count)
+TC_LOG_BINLOG::mark_xids_active(ulong binlog_id, uint xid_count)
{
+ xid_count_per_binlog *b;
+
DBUG_ENTER("TC_LOG_BINLOG::mark_xids_active");
- DBUG_PRINT("info", ("xid_count=%u", xid_count));
- mysql_mutex_lock(&LOCK_prep_xids);
- prepared_xids+= xid_count;
- mysql_mutex_unlock(&LOCK_prep_xids);
+ DBUG_PRINT("info", ("binlog_id=%lu xid_count=%u", binlog_id, xid_count));
+
+ mysql_mutex_lock(&LOCK_xid_list);
+ I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list);
+ while ((b= it++))
+ {
+ if (b->binlog_id == binlog_id)
+ {
+ b->xid_count += xid_count;
+ break;
+ }
+ }
+ /*
+ As we do not delete elements until count reach zero, elements should always
+ be found.
+ */
+ DBUG_ASSERT(b);
+ mysql_mutex_unlock(&LOCK_xid_list);
DBUG_VOID_RETURN;
}
/*
- Once an XID is committed, it is safe to rotate the binary log, as it can no
- longer be needed during crash recovery.
+ Once an XID is committed, it can no longer be needed during crash recovery,
+ as it has been durably recorded on disk as "committed".
This function is called to mark an XID this way. It needs to decrease the
- count of pending XIDs, and signal the log rotator thread when it reaches zero.
+ count of pending XIDs in the corresponding binlog. When the count reaches
+ zero (for an "old" binlog that is not the active one), that binlog file no
+ longer need to be scanned during crash recovery, so we can log a new binlog
+ checkpoint.
*/
void
-TC_LOG_BINLOG::mark_xid_done()
+TC_LOG_BINLOG::mark_xid_done(ulong binlog_id, bool write_checkpoint)
{
- my_bool send_signal;
+ xid_count_per_binlog *b;
+ bool first;
+ ulong current;
DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done");
- mysql_mutex_lock(&LOCK_prep_xids);
- // prepared_xids can be 0 if the transaction had ignorable errors.
- DBUG_ASSERT(prepared_xids >= 0);
- if (prepared_xids > 0)
- prepared_xids--;
- send_signal= (prepared_xids == 0);
- mysql_mutex_unlock(&LOCK_prep_xids);
- if (send_signal) {
- DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
- mysql_cond_signal(&COND_prep_xids);
+
+ mysql_mutex_lock(&LOCK_xid_list);
+ current= current_binlog_id;
+ I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list);
+ first= true;
+ while ((b= it++))
+ {
+ if (b->binlog_id == binlog_id)
+ {
+ --b->xid_count;
+ break;
+ }
+ first= false;
+ }
+ /* Binlog is always found, as we do not remove until count reaches 0 */
+ DBUG_ASSERT(b);
+ /*
+ If a RESET MASTER is pending, we are about to remove all log files, and
+ the RESET MASTER thread is waiting for all pending unlog() calls to
+ complete while holding LOCK_log. In this case we should not log a binlog
+ checkpoint event (it would be deleted immediately anyway and we would
+ deadlock on LOCK_log) but just signal the thread.
+ */
+ if (unlikely(reset_master_pending))
+ {
+ mysql_cond_signal(&COND_xid_list);
+ mysql_mutex_unlock(&LOCK_xid_list);
+ DBUG_VOID_RETURN;
+ }
+
+ if (likely(binlog_id == current) || b->xid_count != 0 || !first ||
+ !write_checkpoint)
+ {
+ /* No new binlog checkpoint reached yet. */
+ mysql_mutex_unlock(&LOCK_xid_list);
+ DBUG_VOID_RETURN;
}
+
+ /*
+ Now log a binlog checkpoint for the first binlog file with a non-zero count.
+
+ Note that it is possible (though perhaps unlikely) that when count of
+ binlog (N-2) drops to zero, binlog (N-1) is already at zero. So we may
+ need to skip several entries before we find the one to log in the binlog
+ checkpoint event.
+
+ We chain the locking of LOCK_xid_list and LOCK_log, so that we ensure that
+ Binlog_checkpoint_events are logged in order. This simplifies recovery a
+ bit, as it can just take the last binlog checkpoint in the log, rather
+ than compare all found against each other to find the one pointing to the
+ most recent binlog.
+
+ Note also that we need to first release LOCK_xid_list, then aquire
+ LOCK_log, then re-aquire LOCK_xid_list. If we were to take LOCK_log while
+ holding LOCK_xid_list, we might deadlock with other threads that take the
+ locks in the opposite order.
+ */
+
+ ++mark_xid_done_waiting;
+ mysql_mutex_unlock(&LOCK_xid_list);
+ mysql_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_xid_list);
+ --mark_xid_done_waiting;
+ if (unlikely(reset_master_pending))
+ mysql_cond_signal(&COND_xid_list);
+ /* We need to reload current_binlog_id due to release/re-take of lock. */
+ current= current_binlog_id;
+
+ for (;;)
+ {
+ /* Remove initial element(s) with zero count. */
+ b= binlog_xid_count_list.head();
+ /*
+ We must not remove all elements in the list - the entry for the current
+ binlog must be present always.
+ */
+ DBUG_ASSERT(b);
+ if (b->binlog_id == current || b->xid_count > 0)
+ break;
+ my_free(binlog_xid_count_list.get());
+ }
+
+ mysql_mutex_unlock(&LOCK_xid_list);
+ write_binlog_checkpoint_event_already_locked(b->binlog_name,
+ b->binlog_name_len);
+ mysql_mutex_unlock(&LOCK_log);
DBUG_VOID_RETURN;
}
int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
{
DBUG_ENTER("TC_LOG_BINLOG::unlog");
- if (xid)
- mark_xid_done();
- /* As ::write_transaction_to_binlog() did not rotate, do it here. */
- DBUG_RETURN(rotate_and_purge(0));
+ if (!xid)
+ DBUG_RETURN(0);
+
+ if (!BINLOG_COOKIE_IS_DUMMY(cookie))
+ mark_xid_done(BINLOG_COOKIE_GET_ID(cookie), true);
+ /*
+ See comment in trx_group_commit_leader() - if rotate() gave a failure,
+ we delay the return of error code to here.
+ */
+ DBUG_RETURN(BINLOG_COOKIE_GET_ERROR_FLAG(cookie));
+}
+
+void
+TC_LOG_BINLOG::commit_checkpoint_notify(void *cookie)
+{
+ xid_count_per_binlog *entry= static_cast<xid_count_per_binlog *>(cookie);
+ mysql_mutex_lock(&LOCK_binlog_background_thread);
+ entry->next_in_queue= binlog_background_thread_queue;
+ binlog_background_thread_queue= entry;
+ mysql_cond_signal(&COND_binlog_background_thread);
+ mysql_mutex_unlock(&LOCK_binlog_background_thread);
+}
+
+/*
+ Binlog background thread.
+
+ This thread is used to log binlog checkpoints in the background, rather than
+ in the context of random storage engine threads that happen to call
+ commit_checkpoint_notify_ha() and may not like the delays while syncing
+ binlog to disk or may not be setup with all my_thread_init() and other
+ necessary stuff.
+
+ In the future, this thread could also be used to do log rotation in the
+ background, which could elimiate all stalls around binlog rotations.
+*/
+pthread_handler_t
+binlog_background_thread(void *arg __attribute__((unused)))
+{
+ bool stop;
+ MYSQL_BIN_LOG::xid_count_per_binlog *queue, *next;
+ THD *thd;
+ my_thread_init();
+ DBUG_ENTER("binlog_background_thread");
+
+ thd= new THD;
+ thd->system_thread= SYSTEM_THREAD_BINLOG_BACKGROUND;
+ thd->thread_stack= (char*) &thd; /* Set approximate stack start */
+ mysql_mutex_lock(&LOCK_thread_count);
+ thd->thread_id= thread_id++;
+ mysql_mutex_unlock(&LOCK_thread_count);
+ thd->store_globals();
+ thd->security_ctx->skip_grants();
+ thd->set_command(COM_DAEMON);
+
+ /*
+ Load the slave replication GTID state from the mysql.gtid_slave_pos
+ table.
+
+ This is mostly so that we can start our seq_no counter from the highest
+ seq_no seen by a slave. This way, we have a way to tell if a transaction
+ logged by ourselves as master is newer or older than a replicated
+ transaction.
+ */
+#ifdef HAVE_REPLICATION
+ if (rpl_load_gtid_slave_state(thd))
+ sql_print_warning("Failed to load slave replication state from table "
+ "%s.%s: %u: %s", "mysql",
+ rpl_gtid_slave_state_table_name.str,
+ thd->get_stmt_da()->sql_errno(),
+ thd->get_stmt_da()->message());
+#endif
+
+ mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread);
+ binlog_background_thread_started= true;
+ mysql_cond_signal(&mysql_bin_log.COND_binlog_background_thread_end);
+ mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread);
+
+ for (;;)
+ {
+ /*
+ Wait until there is something in the queue to process, or we are asked
+ to shut down.
+ */
+ THD_STAGE_INFO(thd, stage_binlog_waiting_background_tasks);
+ mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread);
+ for (;;)
+ {
+ stop= binlog_background_thread_stop;
+ queue= binlog_background_thread_queue;
+ if (stop && !mysql_bin_log.is_xidlist_idle())
+ {
+ /*
+ Delay stop until all pending binlog checkpoints have been processed.
+ */
+ stop= false;
+ }
+ if (stop || queue)
+ break;
+ mysql_cond_wait(&mysql_bin_log.COND_binlog_background_thread,
+ &mysql_bin_log.LOCK_binlog_background_thread);
+ }
+ /* Grab the queue, if any. */
+ binlog_background_thread_queue= NULL;
+ mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread);
+
+ /* Process any incoming commit_checkpoint_notify() calls. */
+ DBUG_EXECUTE_IF("inject_binlog_background_thread_before_mark_xid_done",
+ DBUG_ASSERT(!debug_sync_set_action(
+ thd,
+ STRING_WITH_LEN("binlog_background_thread_before_mark_xid_done "
+ "SIGNAL injected_binlog_background_thread "
+ "WAIT_FOR something_that_will_never_happen "
+ "TIMEOUT 2")));
+ );
+ while (queue)
+ {
+ THD_STAGE_INFO(thd, stage_binlog_processing_checkpoint_notify);
+ DEBUG_SYNC(thd, "binlog_background_thread_before_mark_xid_done");
+ /* Set the thread start time */
+ thd->set_time();
+ /* Grab next pointer first, as mark_xid_done() may free the element. */
+ next= queue->next_in_queue;
+ mysql_bin_log.mark_xid_done(queue->binlog_id, true);
+ queue= next;
+
+ DBUG_EXECUTE_IF("binlog_background_checkpoint_processed",
+ DBUG_ASSERT(!debug_sync_set_action(
+ thd,
+ STRING_WITH_LEN("now SIGNAL binlog_background_checkpoint_processed")));
+ );
+ }
+
+ if (stop)
+ break;
+ }
+
+ THD_STAGE_INFO(thd, stage_binlog_stopping_background_thread);
+
+ mysql_mutex_lock(&LOCK_thread_count);
+ delete thd;
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ my_thread_end();
+
+ /* Signal that we are (almost) stopped. */
+ mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread);
+ binlog_background_thread_stop= false;
+ mysql_cond_signal(&mysql_bin_log.COND_binlog_background_thread_end);
+ mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread);
+
+ DBUG_RETURN(0);
}
-int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
+#ifdef HAVE_PSI_INTERFACE
+static PSI_thread_key key_thread_binlog;
+
+static PSI_thread_info all_binlog_threads[]=
+{
+ { &key_thread_binlog, "binlog_background", PSI_FLAG_GLOBAL},
+};
+#endif /* HAVE_PSI_INTERFACE */
+
+static bool
+start_binlog_background_thread()
{
- Log_event *ev;
+ pthread_t th;
+
+#ifdef HAVE_PSI_INTERFACE
+ if (PSI_server)
+ PSI_server->register_thread("sql", all_binlog_threads,
+ array_elements(all_binlog_threads));
+#endif
+
+ if (mysql_thread_create(key_thread_binlog, &th, &connection_attrib,
+ binlog_background_thread, NULL))
+ return 1;
+
+ /*
+ Wait for the thread to have started (so we know that the slave replication
+ state is loaded and we have correct global_gtid_counter).
+ */
+ mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread);
+ while (!binlog_background_thread_started)
+ mysql_cond_wait(&mysql_bin_log.COND_binlog_background_thread_end,
+ &mysql_bin_log.LOCK_binlog_background_thread);
+ mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread);
+
+ return 0;
+}
+
+
+int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
+ IO_CACHE *first_log,
+ Format_description_log_event *fdle, bool do_xa)
+{
+ Log_event *ev= NULL;
HASH xids;
MEM_ROOT mem_root;
+ char binlog_checkpoint_name[FN_REFLEN];
+ bool binlog_checkpoint_found;
+ bool first_round;
+ IO_CACHE log;
+ File file= -1;
+ const char *errmsg;
+#ifdef HAVE_REPLICATION
+ rpl_gtid last_gtid;
+ bool last_gtid_standalone= false;
+ bool last_gtid_valid= false;
+#endif
if (! fdle->is_valid() ||
- my_hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0,
- sizeof(my_xid), 0, 0, MYF(0)))
+ (do_xa && my_hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0,
+ sizeof(my_xid), 0, 0, MYF(0))))
goto err1;
- init_alloc_root(&mem_root, TC_LOG_PAGE_SIZE, TC_LOG_PAGE_SIZE);
+ if (do_xa)
+ init_alloc_root(&mem_root, TC_LOG_PAGE_SIZE, TC_LOG_PAGE_SIZE, MYF(0));
fdle->flags&= ~LOG_EVENT_BINLOG_IN_USE_F; // abort on the first error
- while ((ev= Log_event::read_log_event(log, 0, fdle,
- opt_master_verify_checksum))
- && ev->is_valid())
+ /*
+ Scan the binlog for XIDs that need to be committed if still in the
+ prepared stage.
+
+ Start with the latest binlog file, then continue with any other binlog
+ files if the last found binlog checkpoint indicates it is needed.
+ */
+
+ binlog_checkpoint_found= false;
+ first_round= true;
+ for (;;)
{
- if (ev->get_type_code() == XID_EVENT)
+ while ((ev= Log_event::read_log_event(first_round ? first_log : &log,
+ 0, fdle, opt_master_verify_checksum))
+ && ev->is_valid())
+ {
+ enum Log_event_type typ= ev->get_type_code();
+ switch (typ)
+ {
+ case XID_EVENT:
+ {
+ if (do_xa)
+ {
+ Xid_log_event *xev=(Xid_log_event *)ev;
+ uchar *x= (uchar *) memdup_root(&mem_root, (uchar*) &xev->xid,
+ sizeof(xev->xid));
+ if (!x || my_hash_insert(&xids, x))
+ goto err2;
+ }
+ break;
+ }
+ case BINLOG_CHECKPOINT_EVENT:
+ if (first_round && do_xa)
+ {
+ uint dir_len;
+ Binlog_checkpoint_log_event *cev= (Binlog_checkpoint_log_event *)ev;
+ if (cev->binlog_file_len >= FN_REFLEN)
+ sql_print_warning("Incorrect binlog checkpoint event with too "
+ "long file name found.");
+ else
+ {
+ /*
+ Note that we cannot use make_log_name() here, as we have not yet
+ initialised MYSQL_BIN_LOG::log_file_name.
+ */
+ dir_len= dirname_length(last_log_name);
+ strmake(strnmov(binlog_checkpoint_name, last_log_name, dir_len),
+ cev->binlog_file_name, FN_REFLEN - 1 - dir_len);
+ binlog_checkpoint_found= true;
+ }
+ }
+ break;
+ case GTID_LIST_EVENT:
+ if (first_round)
+ {
+ Gtid_list_log_event *glev= (Gtid_list_log_event *)ev;
+
+ /* Initialise the binlog state from the Gtid_list event. */
+ if (rpl_global_gtid_binlog_state.load(glev->list, glev->count))
+ goto err2;
+ }
+ break;
+
+#ifdef HAVE_REPLICATION
+ case GTID_EVENT:
+ if (first_round)
+ {
+ Gtid_log_event *gev= (Gtid_log_event *)ev;
+
+ /* Update the binlog state with any GTID logged after Gtid_list. */
+ last_gtid.domain_id= gev->domain_id;
+ last_gtid.server_id= gev->server_id;
+ last_gtid.seq_no= gev->seq_no;
+ last_gtid_standalone=
+ ((gev->flags2 & Gtid_log_event::FL_STANDALONE) ? true : false);
+ last_gtid_valid= true;
+ }
+ break;
+#endif
+
+ default:
+ /* Nothing. */
+ break;
+ }
+
+#ifdef HAVE_REPLICATION
+ if (last_gtid_valid &&
+ ((last_gtid_standalone && !ev->is_part_of_group(typ)) ||
+ (!last_gtid_standalone &&
+ (typ == XID_EVENT ||
+ (typ == QUERY_EVENT &&
+ (((Query_log_event *)ev)->is_commit() ||
+ ((Query_log_event *)ev)->is_rollback()))))))
+ {
+ if (rpl_global_gtid_binlog_state.update_nolock(&last_gtid, false))
+ goto err2;
+ last_gtid_valid= false;
+ }
+#endif
+
+ delete ev;
+ ev= NULL;
+ }
+
+ if (!do_xa)
+ break;
+ /*
+ If the last binlog checkpoint event points to an older log, we have to
+ scan all logs from there also, to get all possible XIDs to recover.
+
+ If there was no binlog checkpoint event at all, this means the log was
+ written by an older version of MariaDB (or MySQL) - these always have an
+ (implicit) binlog checkpoint event at the start of the last binlog file.
+ */
+ if (first_round)
{
- Xid_log_event *xev=(Xid_log_event *)ev;
- uchar *x= (uchar *) memdup_root(&mem_root, (uchar*) &xev->xid,
- sizeof(xev->xid));
- if (!x || my_hash_insert(&xids, x))
+ if (!binlog_checkpoint_found)
+ break;
+ first_round= false;
+ DBUG_EXECUTE_IF("xa_recover_expect_master_bin_000004",
+ if (0 != strcmp("./master-bin.000004", binlog_checkpoint_name) &&
+ 0 != strcmp(".\\master-bin.000004", binlog_checkpoint_name))
+ DBUG_SUICIDE();
+ );
+ if (find_log_pos(linfo, binlog_checkpoint_name, 1))
+ {
+ sql_print_error("Binlog file '%s' not found in binlog index, needed "
+ "for recovery. Aborting.", binlog_checkpoint_name);
goto err2;
+ }
+ }
+ else
+ {
+ end_io_cache(&log);
+ mysql_file_close(file, MYF(MY_WME));
+ file= -1;
+ }
+
+ if (!strcmp(linfo->log_file_name, last_log_name))
+ break; // No more files to do
+ if ((file= open_binlog(&log, linfo->log_file_name, &errmsg)) < 0)
+ {
+ sql_print_error("%s", errmsg);
+ goto err2;
+ }
+ /*
+ We do not need to read the Format_description_log_event of other binlog
+ files. It is not possible for a binlog checkpoint to span multiple
+ binlog files written by different versions of the server. So we can use
+ the first one read for reading from all binlog files.
+ */
+ if (find_next_log(linfo, 1))
+ {
+ sql_print_error("Error reading binlog files during recovery. Aborting.");
+ goto err2;
}
- delete ev;
}
- if (ha_recover(&xids))
- goto err2;
+ if (do_xa)
+ {
+ if (ha_recover(&xids))
+ goto err2;
- free_root(&mem_root, MYF(0));
- my_hash_free(&xids);
+ free_root(&mem_root, MYF(0));
+ my_hash_free(&xids);
+ }
return 0;
err2:
- free_root(&mem_root, MYF(0));
- my_hash_free(&xids);
+ delete ev;
+ if (file >= 0)
+ {
+ end_io_cache(&log);
+ mysql_file_close(file, MYF(MY_WME));
+ }
+ if (do_xa)
+ {
+ free_root(&mem_root, MYF(0));
+ my_hash_free(&xids);
+ }
err1:
sql_print_error("Crash recovery failed. Either correct the problem "
"(if it's, for example, out of memory error) and restart, "
@@ -7608,6 +9570,110 @@ err1:
}
+int
+MYSQL_BIN_LOG::do_binlog_recovery(const char *opt_name, bool do_xa_recovery)
+{
+ LOG_INFO log_info;
+ const char *errmsg;
+ IO_CACHE log;
+ File file;
+ Log_event *ev= 0;
+ Format_description_log_event fdle(BINLOG_VERSION);
+ char log_name[FN_REFLEN];
+ int error;
+
+ if ((error= find_log_pos(&log_info, NullS, 1)))
+ {
+ /*
+ If there are no binlog files (LOG_INFO_EOF), then we still try to read
+ the .state file to restore the binlog state. This allows to copy a server
+ to provision a new one without copying the binlog files (except the
+ master-bin.state file) and still preserve the correct binlog state.
+ */
+ if (error != LOG_INFO_EOF)
+ sql_print_error("find_log_pos() failed (error: %d)", error);
+ else
+ {
+ error= read_state_from_file();
+ if (error == 2)
+ {
+ /*
+ No binlog files and no binlog state is not an error (eg. just initial
+ server start after fresh installation).
+ */
+ error= 0;
+ }
+ }
+ return error;
+ }
+
+ if (! fdle.is_valid())
+ return 1;
+
+ do
+ {
+ strmake_buf(log_name, log_info.log_file_name);
+ } while (!(error= find_next_log(&log_info, 1)));
+
+ if (error != LOG_INFO_EOF)
+ {
+ sql_print_error("find_log_pos() failed (error: %d)", error);
+ return error;
+ }
+
+ if ((file= open_binlog(&log, log_name, &errmsg)) < 0)
+ {
+ sql_print_error("%s", errmsg);
+ return 1;
+ }
+
+ if ((ev= Log_event::read_log_event(&log, 0, &fdle,
+ opt_master_verify_checksum)) &&
+ ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
+ {
+ if (ev->flags & LOG_EVENT_BINLOG_IN_USE_F)
+ {
+ sql_print_information("Recovering after a crash using %s", opt_name);
+ error= recover(&log_info, log_name, &log,
+ (Format_description_log_event *)ev, do_xa_recovery);
+ }
+ else
+ {
+ error= read_state_from_file();
+ if (error == 2)
+ {
+ /*
+ The binlog exists, but the .state file is missing. This is normal if
+ this is the first master start after a major upgrade to 10.0 (with
+ GTID support).
+
+ However, it could also be that the .state file was lost somehow, and
+ in this case it could be a serious issue, as we would set the wrong
+ binlog state in the next binlog file to be created, and GTID
+ processing would be corrupted. A common way would be copying files
+ from an old server to a new one and forgetting the .state file.
+
+ So in this case, we want to try to recover the binlog state by
+ scanning the last binlog file (but we do not need any XA recovery).
+
+ ToDo: We could avoid one scan at first start after major upgrade, by
+ detecting that there is no GTID_LIST event at the start of the
+ binlog file, and stopping the scan in that case.
+ */
+ error= recover(&log_info, log_name, &log,
+ (Format_description_log_event *)ev, false);
+ }
+ }
+ }
+
+ delete ev;
+ end_io_cache(&log);
+ mysql_file_close(file, MYF(MY_WME));
+
+ return error;
+}
+
+
#ifdef INNODB_COMPATIBILITY_HOOKS
/**
Get the file name of the MySQL binlog.
@@ -7662,10 +9728,13 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var,
{
ulong value= *((ulong *)save);
bool check_purge= false;
+ ulong prev_binlog_id;
+ LINT_INIT(prev_binlog_id);
mysql_mutex_lock(mysql_bin_log.get_log_lock());
if(mysql_bin_log.is_open())
{
+ prev_binlog_id= mysql_bin_log.current_binlog_id;
if (binlog_checksum_options != value)
mysql_bin_log.checksum_alg_reset= (uint8) value;
if (mysql_bin_log.rotate(true, &check_purge))
@@ -7679,7 +9748,7 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var,
mysql_bin_log.checksum_alg_reset= BINLOG_CHECKSUM_ALG_UNDEF;
mysql_mutex_unlock(mysql_bin_log.get_log_lock());
if (check_purge)
- mysql_bin_log.purge();
+ mysql_bin_log.checkpoint_and_purge(prev_binlog_id);
}
@@ -7745,7 +9814,7 @@ set_binlog_snapshot_file(const char *src)
Copy out current values of status variables, for SHOW STATUS or
information_schema.global_status.
- This is called only under LOCK_status, so we can fill in a static array.
+ This is called only under LOCK_show_status, so we can fill in a static array.
*/
void
TC_LOG_BINLOG::set_status_variables(THD *thd)
@@ -7767,6 +9836,11 @@ TC_LOG_BINLOG::set_status_variables(THD *thd)
binlog_snapshot_position= last_commit_pos_offset;
}
mysql_mutex_unlock(&LOCK_commit_ordered);
+ mysql_mutex_lock(&LOCK_prepare_ordered);
+ binlog_status_group_commit_trigger_count= this->group_commit_trigger_count;
+ binlog_status_group_commit_trigger_timeout= this->group_commit_trigger_timeout;
+ binlog_status_group_commit_trigger_lock_wait= this->group_commit_trigger_lock_wait;
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
if (have_snapshot)
{