summaryrefslogtreecommitdiff
path: root/sql/log.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log.cc')
-rw-r--r--sql/log.cc1825
1 files changed, 1438 insertions, 387 deletions
diff --git a/sql/log.cc b/sql/log.cc
index c6b41447d6a..c60cca3a2e9 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -1,4 +1,5 @@
-/* Copyright (c) 2000, 2011, Oracle and/or its affiliates. All rights reserved.
+/* Copyright (c) 2000, 2011, Oracle and/or its affiliates.
+ Copyright (c) 2010-2011 Monty Program Ab
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -38,6 +39,7 @@
#include "rpl_filter.h"
#include "rpl_rli.h"
#include "sql_audit.h"
+#include "log_slow.h"
#include <my_dir.h>
#include <stdarg.h>
@@ -49,6 +51,7 @@
#include "sql_plugin.h"
#include "rpl_handler.h"
+#include "debug_sync.h"
/* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024
@@ -70,6 +73,38 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv);
static int binlog_commit(handlerton *hton, THD *thd, bool all);
static int binlog_rollback(handlerton *hton, THD *thd, bool all);
static int binlog_prepare(handlerton *hton, THD *thd, bool all);
+static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd);
+
+static LEX_STRING const write_error_msg=
+ { C_STRING_WITH_LEN("error writing to the binary log") };
+
+static my_bool opt_optimize_thread_scheduling= TRUE;
+ulong binlog_checksum_options;
+#ifndef DBUG_OFF
+static ulong opt_binlog_dbug_fsync_sleep= 0;
+#endif
+
+mysql_mutex_t LOCK_prepare_ordered;
+mysql_mutex_t LOCK_commit_ordered;
+
+static ulonglong binlog_status_var_num_commits;
+static ulonglong binlog_status_var_num_group_commits;
+static char binlog_snapshot_file[FN_REFLEN];
+static ulonglong binlog_snapshot_position;
+
+static SHOW_VAR binlog_status_vars_detail[]=
+{
+ {"commits",
+ (char *)&binlog_status_var_num_commits, SHOW_LONGLONG},
+ {"group_commits",
+ (char *)&binlog_status_var_num_group_commits, SHOW_LONGLONG},
+ {"snapshot_file",
+ (char *)&binlog_snapshot_file, SHOW_CHAR},
+ {"snapshot_position",
+ (char *)&binlog_snapshot_position, SHOW_LONGLONG},
+ {NullS, NullS, SHOW_LONG}
+};
+
/**
purge logs, master and slave sides both, related error code
@@ -147,59 +182,27 @@ sql_print_message_func sql_print_message_handlers[3] =
sql_print_error
};
-/**
- Create the name of the log specified.
-
- This method forms a new path + file name for the
- log specified in @c name.
- @param[IN] buff Location for building new string.
- @param[IN] name Name of the log file.
- @param[IN] log_ext The extension for the log (e.g. .log).
-
- @returns Pointer to new string containing the name.
+/**
+ Create the name of the log file
+
+ @param[OUT] out a pointer to a new allocated name will go there
+ @param[IN] log_ext The extension for the file (e.g .log)
+ @param[IN] once whether to use malloc_once or a normal malloc.
*/
-char *make_log_name(char *buff, const char *name, const char* log_ext)
-{
- strmake(buff, name, FN_REFLEN-5);
- return fn_format(buff, buff, mysql_real_data_home, log_ext,
- MYF(MY_UNPACK_FILENAME|MY_REPLACE_EXT));
-}
-
-/*
- Helper class to hold a mutex for the duration of the
- block.
-
- Eliminates the need for explicit unlocking of mutexes on, e.g.,
- error returns. On passing a null pointer, the sentry will not do
- anything.
- */
-class Mutex_sentry
+void make_default_log_name(char **out, const char* log_ext, bool once)
{
-public:
- Mutex_sentry(mysql_mutex_t *mutex)
- : m_mutex(mutex)
+ char buff[FN_REFLEN+10];
+ fn_format(buff, opt_log_basename, "", log_ext, MYF(MY_REPLACE_EXT));
+ if (once)
+ *out= my_once_strdup(buff, MYF(MY_WME));
+ else
{
- if (m_mutex)
- mysql_mutex_lock(mutex);
+ my_free(*out);
+ *out= my_strdup(buff, MYF(MY_WME));
}
+}
- ~Mutex_sentry()
- {
- if (m_mutex)
- mysql_mutex_unlock(m_mutex);
-#ifndef DBUG_OFF
- m_mutex= 0;
-#endif
- }
-
-private:
- mysql_mutex_t *m_mutex;
-
- // It's not allowed to copy this object in any way
- Mutex_sentry(Mutex_sentry const&);
- void operator=(Mutex_sentry const&);
-};
/*
Helper classes to store non-transactional and transactional data
@@ -421,6 +424,7 @@ public:
ulong *param_ptr_binlog_stmt_cache_disk_use,
ulong *param_ptr_binlog_cache_use,
ulong *param_ptr_binlog_cache_disk_use)
+ : last_commit_pos_offset(0), using_xa(FALSE), xa_xid(0)
{
stmt_cache.set_binlog_cache_info(param_max_binlog_stmt_cache_size,
param_ptr_binlog_stmt_cache_use,
@@ -428,11 +432,20 @@ public:
trx_cache.set_binlog_cache_info(param_max_binlog_cache_size,
param_ptr_binlog_cache_use,
param_ptr_binlog_cache_disk_use);
+ last_commit_pos_file[0]= 0;
}
- void reset_cache(binlog_cache_data* cache_data)
+ void reset(bool do_stmt, bool do_trx)
{
- cache_data->reset();
+ if (do_stmt)
+ stmt_cache.reset();
+ if (do_trx)
+ {
+ trx_cache.reset();
+ using_xa= FALSE;
+ last_commit_pos_file[0]= 0;
+ last_commit_pos_offset= 0;
+ }
}
binlog_cache_data* get_binlog_cache_data(bool is_transactional)
@@ -449,6 +462,23 @@ public:
binlog_cache_data trx_cache;
+ /*
+ Binlog position for current transaction.
+ For START TRANSACTION WITH CONSISTENT SNAPSHOT, this is the binlog
+ position corresponding to the snapshot taken. During (and after) commit,
+ this is set to the binlog position corresponding to just after the
+ commit (so storage engines can store it in their transaction log).
+ */
+ char last_commit_pos_file[FN_REFLEN];
+ my_off_t last_commit_pos_offset;
+
+ /*
+ Flag set true if this transaction is committed with log_xid() as part of
+ XA, false if not.
+ */
+ bool using_xa;
+ my_xid xa_xid;
+
private:
binlog_cache_mngr& operator=(const binlog_cache_mngr& info);
@@ -551,7 +581,7 @@ void Log_to_csv_event_handler::cleanup()
*/
bool Log_to_csv_event_handler::
- log_general(THD *thd, time_t event_time, const char *user_host,
+ log_general(THD *thd, my_hrtime_t event_time, const char *user_host,
uint user_host_len, int thread_id,
const char *command_type, uint command_type_len,
const char *sql_text, uint sql_text_len,
@@ -568,6 +598,7 @@ bool Log_to_csv_event_handler::
Open_tables_backup open_tables_backup;
ulonglong save_thd_options;
bool save_time_zone_used;
+ DBUG_ENTER("log_general");
/*
CSV uses TIME_to_timestamp() internally if table needs to be repaired
@@ -603,7 +634,7 @@ bool Log_to_csv_event_handler::
need_close= TRUE;
if (table->file->extra(HA_EXTRA_MARK_AS_LOG_TABLE) ||
- table->file->ha_rnd_init(0))
+ table->file->ha_rnd_init_with_error(0))
goto err;
need_rnd_end= TRUE;
@@ -627,8 +658,8 @@ bool Log_to_csv_event_handler::
DBUG_ASSERT(table->field[0]->type() == MYSQL_TYPE_TIMESTAMP);
- ((Field_timestamp*) table->field[0])->store_timestamp((my_time_t)
- event_time);
+ ((Field_timestamp*) table->field[0])->store_TIME(
+ hrtime_to_my_time(event_time), hrtime_sec_part(event_time));
/* do a write */
if (table->field[1]->store(user_host, user_host_len, client_cs) ||
@@ -681,7 +712,7 @@ err:
thd->variables.option_bits= save_thd_options;
thd->time_zone_used= save_time_zone_used;
- return result;
+ DBUG_RETURN(result);
}
@@ -692,7 +723,6 @@ err:
log_slow()
thd THD of the query
current_time current timestamp
- query_start_arg command start timestamp
user_host the pointer to the string with user@host info
user_host_len length of the user_host string. this is computed once
and passed to all general log event handlers
@@ -715,7 +745,7 @@ err:
*/
bool Log_to_csv_event_handler::
- log_slow(THD *thd, time_t current_time, time_t query_start_arg,
+ log_slow(THD *thd, my_hrtime_t current_time,
const char *user_host, uint user_host_len,
ulonglong query_utime, ulonglong lock_utime, bool is_command,
const char *sql_text, uint sql_text_len)
@@ -729,6 +759,11 @@ bool Log_to_csv_event_handler::
Open_tables_backup open_tables_backup;
CHARSET_INFO *client_cs= thd->variables.character_set_client;
bool save_time_zone_used;
+ long query_time= (long) min(query_utime/1000000, TIME_MAX_VALUE_SECONDS);
+ long lock_time= (long) min(lock_utime/1000000, TIME_MAX_VALUE_SECONDS);
+ long query_time_micro= (long) (query_utime % 1000000);
+ long lock_time_micro= (long) (lock_utime % 1000000);
+
DBUG_ENTER("Log_to_csv_event_handler::log_slow");
thd->push_internal_handler(& error_handler);
@@ -749,7 +784,7 @@ bool Log_to_csv_event_handler::
need_close= TRUE;
if (table->file->extra(HA_EXTRA_MARK_AS_LOG_TABLE) ||
- table->file->ha_rnd_init(0))
+ table->file->ha_rnd_init_with_error(0))
goto err;
need_rnd_end= TRUE;
@@ -765,45 +800,34 @@ bool Log_to_csv_event_handler::
/* store the time and user values */
DBUG_ASSERT(table->field[0]->type() == MYSQL_TYPE_TIMESTAMP);
- ((Field_timestamp*) table->field[0])->store_timestamp((my_time_t)
- current_time);
+ ((Field_timestamp*) table->field[0])->store_TIME(
+ hrtime_to_my_time(current_time), hrtime_sec_part(current_time));
if (table->field[1]->store(user_host, user_host_len, client_cs))
goto err;
- if (query_start_arg)
- {
- longlong query_time= (longlong) (query_utime/1000000);
- longlong lock_time= (longlong) (lock_utime/1000000);
- /*
- A TIME field can not hold the full longlong range; query_time or
- lock_time may be truncated without warning here, if greater than
- 839 hours (~35 days)
- */
- MYSQL_TIME t;
- t.neg= 0;
+ /*
+ A TIME field can not hold the full longlong range; query_time or
+ lock_time may be truncated without warning here, if greater than
+ 839 hours (~35 days)
+ */
+ MYSQL_TIME t;
+ t.neg= 0;
+
+ /* fill in query_time field */
+ calc_time_from_sec(&t, query_time, query_time_micro);
+ if (table->field[2]->store_time(&t))
+ goto err;
+ /* lock_time */
+ calc_time_from_sec(&t, lock_time, lock_time_micro);
+ if (table->field[3]->store_time(&t))
+ goto err;
+ /* rows_sent */
+ if (table->field[4]->store((longlong) thd->sent_row_count, TRUE))
+ goto err;
+ /* rows_examined */
+ if (table->field[5]->store((longlong) thd->examined_row_count, TRUE))
+ goto err;
- /* fill in query_time field */
- calc_time_from_sec(&t, (long) min(query_time, (longlong) TIME_MAX_VALUE_SECONDS), 0);
- if (table->field[2]->store_time(&t, MYSQL_TIMESTAMP_TIME))
- goto err;
- /* lock_time */
- calc_time_from_sec(&t, (long) min(lock_time, (longlong) TIME_MAX_VALUE_SECONDS), 0);
- if (table->field[3]->store_time(&t, MYSQL_TIMESTAMP_TIME))
- goto err;
- /* rows_sent */
- if (table->field[4]->store((longlong) thd->sent_row_count, TRUE))
- goto err;
- /* rows_examined */
- if (table->field[5]->store((longlong) thd->examined_row_count, TRUE))
- goto err;
- }
- else
- {
- table->field[2]->set_null();
- table->field[3]->set_null();
- table->field[4]->set_null();
- table->field[5]->set_null();
- }
/* fill database field */
if (thd->db)
{
@@ -935,14 +959,14 @@ void Log_to_file_event_handler::init_pthread_objects()
/** Wrapper around MYSQL_LOG::write() for slow log. */
bool Log_to_file_event_handler::
- log_slow(THD *thd, time_t current_time, time_t query_start_arg,
+ log_slow(THD *thd, my_hrtime_t current_time,
const char *user_host, uint user_host_len,
ulonglong query_utime, ulonglong lock_utime, bool is_command,
const char *sql_text, uint sql_text_len)
{
Silence_log_table_errors error_handler;
thd->push_internal_handler(&error_handler);
- bool retval= mysql_slow_log.write(thd, current_time, query_start_arg,
+ bool retval= mysql_slow_log.write(thd, hrtime_to_my_time(current_time),
user_host, user_host_len,
query_utime, lock_utime, is_command,
sql_text, sql_text_len);
@@ -957,7 +981,7 @@ bool Log_to_file_event_handler::
*/
bool Log_to_file_event_handler::
- log_general(THD *thd, time_t event_time, const char *user_host,
+ log_general(THD *thd, my_hrtime_t event_time, const char *user_host,
uint user_host_len, int thread_id,
const char *command_type, uint command_type_len,
const char *sql_text, uint sql_text_len,
@@ -965,7 +989,8 @@ bool Log_to_file_event_handler::
{
Silence_log_table_errors error_handler;
thd->push_internal_handler(&error_handler);
- bool retval= mysql_log.write(event_time, user_host, user_host_len,
+ bool retval= mysql_log.write(hrtime_to_time(event_time), user_host,
+ user_host_len,
thread_id, command_type, command_type_len,
sql_text, sql_text_len);
thd->pop_internal_handler();
@@ -1100,8 +1125,6 @@ void LOGGER::init_log_tables()
bool LOGGER::flush_logs(THD *thd)
{
- int rc= 0;
-
/*
Now we lock logger, as nobody should be able to use logging routines while
log tables are closed
@@ -1113,7 +1136,7 @@ bool LOGGER::flush_logs(THD *thd)
/* end of log flush */
logger.unlock();
- return rc;
+ return 0;
}
@@ -1200,8 +1223,6 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length,
if (*slow_log_handler_list)
{
- time_t current_time;
-
/* do not log slow queries from replication threads */
if (thd->slave_thread && !opt_log_slow_slave_statements)
return 0;
@@ -1216,21 +1237,17 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length,
/* fill in user_host value: the format is "%s[%s] @ %s [%s]" */
user_host_len= (strxnmov(user_host_buff, MAX_USER_HOST_SIZE,
sctx->priv_user ? sctx->priv_user : "", "[",
- sctx->user ? sctx->user : "", "] @ ",
+ sctx->user ? sctx->user : (thd->slave_thread ? "SQL_SLAVE" : ""), "] @ ",
sctx->host ? sctx->host : "", " [",
sctx->ip ? sctx->ip : "", "]", NullS) -
user_host_buff);
- current_time= my_time_possible_from_micro(current_utime);
- if (thd->start_utime)
- {
- query_utime= (current_utime - thd->start_utime);
- lock_utime= (thd->utime_after_lock - thd->start_utime);
- }
- else
- {
- query_utime= lock_utime= 0;
- }
+ DBUG_ASSERT(thd->start_utime);
+ DBUG_ASSERT(thd->start_time);
+ query_utime= (current_utime - thd->start_utime);
+ lock_utime= (thd->utime_after_lock - thd->start_utime);
+ my_hrtime_t current_time= { hrtime_from_time(thd->start_time) +
+ thd->start_time_sec_part + query_utime };
if (!query)
{
@@ -1239,8 +1256,19 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length,
query_length= command_name[thd->command].length;
}
+ if (!query_length)
+ {
+ /*
+ Not a real query; Reset counts for slow query logging
+ (QQ: Wonder if this is really needed)
+ */
+ thd->sent_row_count= thd->examined_row_count= 0;
+ thd->query_plan_flags= QPLAN_INIT;
+ thd->query_plan_fsort_passes= 0;
+ }
+
for (current_handler= slow_log_handler_list; *current_handler ;)
- error= (*current_handler++)->log_slow(thd, current_time, thd->start_time,
+ error= (*current_handler++)->log_slow(thd, current_time,
user_host_buff, user_host_len,
query_utime, lock_utime, is_command,
query, query_length) || error;
@@ -1257,7 +1285,7 @@ bool LOGGER::general_log_write(THD *thd, enum enum_server_command command,
Log_event_handler **current_handler= general_log_handler_list;
char user_host_buff[MAX_USER_HOST_SIZE + 1];
uint user_host_len= 0;
- time_t current_time;
+ my_hrtime_t current_time;
DBUG_ASSERT(thd);
@@ -1269,9 +1297,9 @@ bool LOGGER::general_log_write(THD *thd, enum enum_server_command command,
}
user_host_len= make_user_name(thd, user_host_buff);
- current_time= my_time(0);
+ current_time= my_hrtime();
- mysql_audit_general_log(thd, current_time,
+ mysql_audit_general_log(thd, hrtime_to_time(current_time),
user_host_buff, user_host_len,
command_name[(uint) command].str,
command_name[(uint) command].length,
@@ -1436,6 +1464,7 @@ void LOGGER::deactivate_log_handler(THD *thd, uint log_type)
{
my_bool *tmp_opt= 0;
MYSQL_LOG *file_log;
+ LINT_INIT(file_log);
switch (log_type) {
case QUERY_LOG_SLOW:
@@ -1575,6 +1604,7 @@ int binlog_init(void *p)
binlog_hton->commit= binlog_commit;
binlog_hton->rollback= binlog_rollback;
binlog_hton->prepare= binlog_prepare;
+ binlog_hton->start_consistent_snapshot= binlog_start_consistent_snapshot;
binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN;
return 0;
}
@@ -1590,48 +1620,69 @@ static int binlog_close_connection(handlerton *hton, THD *thd)
return 0;
}
-/**
+/*
This function flushes a cache upon commit/rollback.
- @param thd The thread whose transaction should be flushed
- @param cache_data Pointer to the cache
- @param end_ev The end event either commit/rollback
- @param is_transactional The type of the cache: transactional or
- non-transactional
+ SYNOPSIS
+ binlog_flush_cache()
- @return
- nonzero if an error pops up when flushing the cache.
-*/
-static inline int
-binlog_flush_cache(THD *thd, binlog_cache_data* cache_data, Log_event *end_evt,
- bool is_transactional)
+ thd The thread whose transaction should be ended
+ cache_mngr Pointer to the binlog_cache_mngr to use
+ all True if the entire transaction should be ended, false if
+ only the statement transaction should be ended.
+ end_ev The end event to use (COMMIT, ROLLBACK, or commit XID)
+ using_stmt True if the statement cache should be flushed
+ using_trx True if the transaction cache should be flushed
+
+ DESCRIPTION
+
+ End the currently transaction or statement. The transaction can be either
+ a real transaction or a statement transaction.
+
+ This can be to commit a transaction, with a COMMIT query event or an XA
+ commit XID event. But it can also be to rollback a transaction with a
+ ROLLBACK query event, used for rolling back transactions which also
+ contain updates to non-transactional tables. Or it can be a flush of
+ a statement cache.
+ */
+static int
+binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
+ Log_event *end_ev, bool all, bool using_stmt,
+ bool using_trx)
{
- DBUG_ENTER("binlog_flush_cache");
int error= 0;
+ DBUG_ENTER("binlog_flush_cache");
- if (!cache_data->empty())
+ if ((using_stmt && !cache_mngr->stmt_cache.empty()) ||
+ (using_trx && !cache_mngr->trx_cache.empty()))
{
- if (thd->binlog_flush_pending_rows_event(TRUE, is_transactional))
+ if (using_stmt && thd->binlog_flush_pending_rows_event(TRUE, FALSE))
DBUG_RETURN(1);
+ if (using_trx && thd->binlog_flush_pending_rows_event(TRUE, TRUE))
+ DBUG_RETURN(1);
+
/*
Doing a commit or a rollback including non-transactional tables,
i.e., ending a transaction where we might write the transaction
cache to the binary log.
We can always end the statement when ending a transaction since
- transactions are not allowed inside stored functions. If they
+ transactions are not allowed inside stored functions. If they
were, we would have to ensure that we're not ending a statement
inside a stored function.
*/
- error= mysql_bin_log.write(thd, &cache_data->cache_log, end_evt,
- cache_data->has_incident());
+ error= mysql_bin_log.write_transaction_to_binlog(thd, cache_mngr,
+ end_ev, all,
+ using_stmt, using_trx);
}
- cache_data->reset();
+ cache_mngr->reset(using_stmt, using_trx);
- DBUG_ASSERT(cache_data->empty());
+ DBUG_ASSERT((!using_stmt || cache_mngr->stmt_cache.empty()) &&
+ (!using_trx || cache_mngr->trx_cache.empty()));
DBUG_RETURN(error);
}
+
/**
This function flushes the stmt-cache upon commit.
@@ -1642,13 +1693,12 @@ binlog_flush_cache(THD *thd, binlog_cache_data* cache_data, Log_event *end_evt,
nonzero if an error pops up when flushing the cache.
*/
static inline int
-binlog_commit_flush_stmt_cache(THD *thd,
+binlog_commit_flush_stmt_cache(THD *thd, bool all,
binlog_cache_mngr *cache_mngr)
{
Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
- FALSE, FALSE, TRUE, 0);
- return (binlog_flush_cache(thd, &cache_mngr->stmt_cache, &end_evt,
- FALSE));
+ FALSE, TRUE, TRUE, 0);
+ return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE));
}
/**
@@ -1661,12 +1711,11 @@ binlog_commit_flush_stmt_cache(THD *thd,
nonzero if an error pops up when flushing the cache.
*/
static inline int
-binlog_commit_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr)
+binlog_commit_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr)
{
Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
- TRUE, FALSE, TRUE, 0);
- return (binlog_flush_cache(thd, &cache_mngr->trx_cache, &end_evt,
- TRUE));
+ TRUE, TRUE, TRUE, 0);
+ return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE));
}
/**
@@ -1679,12 +1728,12 @@ binlog_commit_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr)
nonzero if an error pops up when flushing the cache.
*/
static inline int
-binlog_rollback_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr)
+binlog_rollback_flush_trx_cache(THD *thd, bool all,
+ binlog_cache_mngr *cache_mngr)
{
Query_log_event end_evt(thd, STRING_WITH_LEN("ROLLBACK"),
- TRUE, FALSE, TRUE, 0);
- return (binlog_flush_cache(thd, &cache_mngr->trx_cache, &end_evt,
- TRUE));
+ TRUE, TRUE, TRUE, 0);
+ return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE));
}
/**
@@ -1698,12 +1747,26 @@ binlog_rollback_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr)
nonzero if an error pops up when flushing the cache.
*/
static inline int
-binlog_commit_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr,
- my_xid xid)
+binlog_commit_flush_xid_caches(THD *thd, binlog_cache_mngr *cache_mngr,
+ bool all, my_xid xid)
{
- Xid_log_event end_evt(thd, xid);
- return (binlog_flush_cache(thd, &cache_mngr->trx_cache, &end_evt,
- TRUE));
+ if (xid)
+ {
+ Xid_log_event end_evt(thd, xid, TRUE);
+ return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE));
+ }
+ else
+ {
+ /*
+ Empty xid occurs in XA COMMIT ... ONE PHASE.
+ In this case, we do not have a MySQL xid for the transaction, and the
+ external XA transaction coordinator will have to handle recovery if
+ needed. So we end the transaction with a plain COMMIT query event.
+ */
+ Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
+ TRUE, TRUE, TRUE, 0);
+ return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE));
+ }
}
/**
@@ -1742,11 +1805,11 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all)
if (ending_trans(thd, all))
{
if (cache_mngr->trx_cache.has_incident())
- error= mysql_bin_log.write_incident(thd, TRUE);
+ error= mysql_bin_log.write_incident(thd);
thd->clear_binlog_table_maps();
- cache_mngr->reset_cache(&cache_mngr->trx_cache);
+ cache_mngr->reset(false, true);
}
/*
If rolling back a statement in a transaction, we truncate the
@@ -1765,7 +1828,7 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all)
do nothing.
just pretend we can do 2pc, so that MySQL won't
switch to 1pc.
- real work will be done in MYSQL_BIN_LOG::log_xid()
+ real work will be done in MYSQL_BIN_LOG::log_and_order()
*/
return 0;
}
@@ -1798,7 +1861,7 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
if (!cache_mngr->stmt_cache.empty())
{
- error= binlog_commit_flush_stmt_cache(thd, cache_mngr);
+ error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr);
}
if (cache_mngr->trx_cache.empty())
@@ -1806,7 +1869,7 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
/*
we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid()
*/
- cache_mngr->reset_cache(&cache_mngr->trx_cache);
+ cache_mngr->reset(false, true);
DBUG_RETURN(error);
}
@@ -1817,7 +1880,7 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
Otherwise, we accumulate the changes.
*/
if (!error && ending_trans(thd, all))
- error= binlog_commit_flush_trx_cache(thd, cache_mngr);
+ error= binlog_commit_flush_trx_cache(thd, all, cache_mngr);
/*
This is part of the stmt rollback.
@@ -1856,12 +1919,12 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
*/
if (cache_mngr->stmt_cache.has_incident())
{
- error= mysql_bin_log.write_incident(thd, TRUE);
- cache_mngr->reset_cache(&cache_mngr->stmt_cache);
+ error= mysql_bin_log.write_incident(thd);
+ cache_mngr->reset(true, false);
}
else if (!cache_mngr->stmt_cache.empty())
{
- error= binlog_commit_flush_stmt_cache(thd, cache_mngr);
+ error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr);
}
if (cache_mngr->trx_cache.empty())
@@ -1869,7 +1932,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
/*
we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid()
*/
- cache_mngr->reset_cache(&cache_mngr->trx_cache);
+ cache_mngr->reset(false, true);
DBUG_RETURN(error);
}
@@ -1909,7 +1972,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
(trans_has_updated_non_trans_table(thd) &&
ending_single_stmt_trans(thd,all) &&
thd->variables.binlog_format == BINLOG_FORMAT_MIXED)))
- error= binlog_rollback_flush_trx_cache(thd, cache_mngr);
+ error= binlog_rollback_flush_trx_cache(thd, all, cache_mngr);
/*
Truncate the cache if:
. aborting a single or multi-statement transaction or;
@@ -1987,6 +2050,7 @@ bool MYSQL_BIN_LOG::check_write_error(THD *thd)
DBUG_RETURN(checked);
}
+
/**
@note
How do we handle this (unlikely but legal) case:
@@ -2025,7 +2089,7 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
log_query.append("`"))
DBUG_RETURN(1);
int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
- Query_log_event qinfo(thd, log_query.c_ptr_safe(), log_query.length(),
+ Query_log_event qinfo(thd, log_query.ptr(), log_query.length(),
TRUE, FALSE, TRUE, errcode);
DBUG_RETURN(mysql_bin_log.write(&qinfo));
}
@@ -2049,7 +2113,7 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
log_query.append("`"))
DBUG_RETURN(1);
int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
- Query_log_event qinfo(thd, log_query.c_ptr_safe(), log_query.length(),
+ Query_log_event qinfo(thd, log_query.ptr(), log_query.length(),
TRUE, FALSE, TRUE, errcode);
DBUG_RETURN(mysql_bin_log.write(&qinfo));
}
@@ -2060,17 +2124,17 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
int check_binlog_magic(IO_CACHE* log, const char** errmsg)
{
- char magic[4];
+ uchar magic[4];
DBUG_ASSERT(my_b_tell(log) == 0);
- if (my_b_read(log, (uchar*) magic, sizeof(magic)))
+ if (my_b_read(log, magic, sizeof(magic)))
{
*errmsg = "I/O error reading the header from the binary log";
sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno,
log->error);
return 1;
}
- if (memcmp(magic, BINLOG_MAGIC, sizeof(magic)))
+ if (bcmp(magic, BINLOG_MAGIC, sizeof(magic)))
{
*errmsg = "Binlog has bad magic number; It's not a binary log file that can be used by this version of MySQL";
return 1;
@@ -2177,6 +2241,7 @@ static int find_uniq_filename(char *name)
char *start, *end;
int error= 0;
DBUG_ENTER("find_uniq_filename");
+ LINT_INIT(number);
length= dirname_part(buff, name, &buf_length);
start= name + length;
@@ -2635,7 +2700,6 @@ err:
thd THD of the query
current_time current timestamp
- query_start_arg command start timestamp
user_host the pointer to the string with user@host info
user_host_len length of the user_host string. this is computed once
and passed to all general log event handlers
@@ -2657,7 +2721,7 @@ err:
*/
bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
- time_t query_start_arg, const char *user_host,
+ const char *user_host,
uint user_host_len, ulonglong query_utime,
ulonglong lock_utime, bool is_command,
const char *sql_text, uint sql_text_len)
@@ -2707,19 +2771,39 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
if (my_b_write(&log_file, (uchar*) "\n", 1))
tmp_errno= errno;
}
+
/* For slow query log */
sprintf(query_time_buff, "%.6f", ulonglong2double(query_utime)/1000000.0);
sprintf(lock_time_buff, "%.6f", ulonglong2double(lock_utime)/1000000.0);
if (my_b_printf(&log_file,
- "# Query_time: %s Lock_time: %s"
- " Rows_sent: %lu Rows_examined: %lu\n",
+ "# Thread_id: %lu Schema: %s QC_hit: %s\n" \
+ "# Query_time: %s Lock_time: %s Rows_sent: %lu Rows_examined: %lu\n",
+ (ulong) thd->thread_id, (thd->db ? thd->db : ""),
+ ((thd->query_plan_flags & QPLAN_QC) ? "Yes" : "No"),
query_time_buff, lock_time_buff,
(ulong) thd->sent_row_count,
- (ulong) thd->examined_row_count) == (uint) -1)
+ (ulong) thd->examined_row_count) == (size_t) -1)
tmp_errno= errno;
+ if ((thd->variables.log_slow_verbosity & LOG_SLOW_VERBOSITY_QUERY_PLAN) &&
+ (thd->query_plan_flags &
+ (QPLAN_FULL_SCAN | QPLAN_FULL_JOIN | QPLAN_TMP_TABLE |
+ QPLAN_TMP_DISK | QPLAN_FILESORT | QPLAN_FILESORT_DISK)) &&
+ my_b_printf(&log_file,
+ "# Full_scan: %s Full_join: %s "
+ "Tmp_table: %s Tmp_table_on_disk: %s\n"
+ "# Filesort: %s Filesort_on_disk: %s Merge_passes: %lu\n",
+ ((thd->query_plan_flags & QPLAN_FULL_SCAN) ? "Yes" : "No"),
+ ((thd->query_plan_flags & QPLAN_FULL_JOIN) ? "Yes" : "No"),
+ ((thd->query_plan_flags & QPLAN_TMP_TABLE) ? "Yes" : "No"),
+ ((thd->query_plan_flags & QPLAN_TMP_DISK) ? "Yes" : "No"),
+ ((thd->query_plan_flags & QPLAN_FILESORT) ? "Yes" : "No"),
+ ((thd->query_plan_flags & QPLAN_FILESORT_DISK) ?
+ "Yes" : "No"),
+ thd->query_plan_fsort_passes) == (size_t) -1)
+ tmp_errno= errno;
if (thd->db && strcmp(thd->db, db))
{ // Database changed
- if (my_b_printf(&log_file,"use %s;\n",thd->db) == (uint) -1)
+ if (my_b_printf(&log_file,"use %s;\n",thd->db) == (size_t) -1)
tmp_errno= errno;
strmov(db,thd->db);
}
@@ -2814,8 +2898,12 @@ const char *MYSQL_LOG::generate_name(const char *log_name,
MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period)
:bytes_written(0), prepared_xids(0), file_id(1), open_count(1),
need_start_event(TRUE),
+ group_commit_queue(0), group_commit_queue_busy(FALSE),
+ num_commits(0), num_group_commits(0),
sync_period_ptr(sync_period),
is_relay_log(0), signal_cnt(0),
+ checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF),
+ relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
description_event_for_exec(0), description_event_for_queue(0)
{
/*
@@ -2863,7 +2951,9 @@ void MYSQL_BIN_LOG::init_pthread_objects()
{
MYSQL_LOG::init_pthread_objects();
mysql_mutex_init(m_key_LOCK_index, &LOCK_index, MY_MUTEX_INIT_SLOW);
+ mysql_mutex_setflags(&LOCK_index, MYF_NO_DEADLOCK_DETECTION);
mysql_cond_init(m_key_update_cond, &update_cond, 0);
+ mysql_cond_init(m_key_COND_queue_busy, &COND_queue_busy, 0);
}
@@ -3029,7 +3119,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
an extension for the binary log files.
In this case we write a standard header to it.
*/
- if (my_b_safe_write(&log_file, (uchar*) BINLOG_MAGIC,
+ if (my_b_safe_write(&log_file, BINLOG_MAGIC,
BIN_LOG_HEADER_SIZE))
goto err;
bytes_written+= BIN_LOG_HEADER_SIZE;
@@ -3048,7 +3138,19 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
as we won't be able to reset it later
*/
if (io_cache_type == WRITE_CACHE)
- s.flags|= LOG_EVENT_BINLOG_IN_USE_F;
+ s.flags |= LOG_EVENT_BINLOG_IN_USE_F;
+ s.checksum_alg= is_relay_log ?
+ /* relay-log */
+ /* inherit master's A descriptor if one has been received */
+ (relay_log_checksum_alg=
+ (relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
+ relay_log_checksum_alg :
+ /* otherwise use slave's local preference of RL events verification */
+ (opt_slave_sql_verify_checksum == 0) ?
+ (uint8) BINLOG_CHECKSUM_ALG_OFF : (uint8) binlog_checksum_options):
+ /* binlog */
+ (uint8) binlog_checksum_options;
+ DBUG_ASSERT(s.checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
if (!s.is_valid())
goto err;
s.dont_set_created= null_created_arg;
@@ -3090,6 +3192,11 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
if (flush_io_cache(&log_file) ||
mysql_file_sync(log_file.file, MYF(MY_WME)))
goto err;
+ mysql_mutex_lock(&LOCK_commit_ordered);
+ strmake(last_commit_pos_file, log_file_name,
+ sizeof(last_commit_pos_file)-1);
+ last_commit_pos_offset= my_b_tell(&log_file);
+ mysql_mutex_unlock(&LOCK_commit_ordered);
if (write_file_name_to_index_file)
{
@@ -3137,11 +3244,7 @@ To turn it on again: fix the cause, \
shutdown the MySQL server and restart it.", name, errno);
if (file >= 0)
mysql_file_close(file, MYF(0));
- end_io_cache(&log_file);
- end_io_cache(&index_file);
- my_free(name);
- name= NULL;
- log_state= LOG_CLOSED;
+ close(LOG_CLOSE_INDEX);
DBUG_RETURN(1);
}
@@ -3374,6 +3477,13 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
ha_reset_logs(thd);
/*
+ We need to get both locks to be sure that no one is trying to
+ write to the index log file.
+ */
+ mysql_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_index);
+
+ /*
The following mutex is needed to ensure that no threads call
'delete thd' as we would then risk missing a 'rollback' from this
thread. If the transaction involved MyISAM tables, it should go
@@ -3381,13 +3491,6 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
*/
mysql_mutex_lock(&LOCK_thread_count);
- /*
- We need to get both locks to be sure that no one is trying to
- write to the index log file.
- */
- mysql_mutex_lock(&LOCK_log);
- mysql_mutex_lock(&LOCK_index);
-
/* Save variables so that we can reopen the log */
save_name=name;
name=0; // Protect against free
@@ -3798,8 +3901,7 @@ int MYSQL_BIN_LOG::close_purge_index_file()
bool MYSQL_BIN_LOG::is_inited_purge_index_file()
{
- DBUG_ENTER("MYSQL_BIN_LOG::is_inited_purge_index_file");
- DBUG_RETURN (my_b_inited(&purge_index_file));
+ return my_b_inited(&purge_index_file);
}
int MYSQL_BIN_LOG::sync_purge_index_file()
@@ -3835,13 +3937,12 @@ int MYSQL_BIN_LOG::register_create_index_entry(const char *entry)
int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *decrease_log_space,
bool need_mutex)
{
+ DBUG_ENTER("MYSQL_BIN_LOG:purge_index_entry");
MY_STAT s;
int error= 0;
LOG_INFO log_info;
LOG_INFO check_log_info;
- DBUG_ENTER("MYSQL_BIN_LOG:purge_index_entry");
-
DBUG_ASSERT(my_b_inited(&purge_index_file));
if ((error=reinit_io_cache(&purge_index_file, READ_CACHE, 0, 0, 0)))
@@ -4230,8 +4331,15 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
We log the whole file name for log file as the user may decide
to change base names at some point.
*/
- Rotate_log_event r(new_name+dirname_length(new_name),
- 0, LOG_EVENT_OFFSET, is_relay_log ? Rotate_log_event::RELAY_LOG : 0);
+ Rotate_log_event r(new_name+dirname_length(new_name), 0, LOG_EVENT_OFFSET,
+ is_relay_log ? Rotate_log_event::RELAY_LOG : 0);
+ /*
+ The current relay-log's closing Rotate event must have checksum
+ value computed with an algorithm of the last relay-logged FD event.
+ */
+ if (is_relay_log)
+ r.checksum_alg= relay_log_checksum_alg;
+ DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
if(DBUG_EVALUATE_IF("fault_injection_new_file_rotate_event", (error=close_on_error=TRUE), FALSE) ||
(error= r.write(&log_file)))
{
@@ -4252,7 +4360,12 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
old_name=name;
name=0; // Don't free name
close(LOG_CLOSE_TO_BE_OPENED | LOG_CLOSE_INDEX);
-
+ if (log_type == LOG_BIN && checksum_alg_reset != BINLOG_CHECKSUM_ALG_UNDEF)
+ {
+ DBUG_ASSERT(!is_relay_log);
+ DBUG_ASSERT(binlog_checksum_options != checksum_alg_reset);
+ binlog_checksum_options= checksum_alg_reset;
+ }
/*
Note that at this point, log_state != LOG_CLOSED (important for is_open()).
*/
@@ -4394,6 +4507,10 @@ bool MYSQL_BIN_LOG::flush_and_sync(bool *synced)
err= mysql_file_sync(fd, MYF(MY_WME));
if (synced)
*synced= 1;
+#ifndef DBUG_OFF
+ if (opt_binlog_dbug_fsync_sleep > 0)
+ my_sleep(opt_binlog_dbug_fsync_sleep);
+#endif
}
return err;
}
@@ -4654,12 +4771,34 @@ void THD::binlog_set_stmt_begin() {
cache_mngr->trx_cache.set_prev_position(pos);
}
+static int
+binlog_start_consistent_snapshot(handlerton *hton, THD *thd)
+{
+ int err= 0;
+ DBUG_ENTER("binlog_start_consistent_snapshot");
+
+ thd->binlog_setup_trx_data();
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+
+ /* Server layer calls us with LOCK_commit_ordered locked, so this is safe. */
+ strmake(cache_mngr->last_commit_pos_file, mysql_bin_log.last_commit_pos_file,
+ sizeof(cache_mngr->last_commit_pos_file)-1);
+ cache_mngr->last_commit_pos_offset= mysql_bin_log.last_commit_pos_offset;
+
+ trans_register_ha(thd, TRUE, hton);
+
+ DBUG_RETURN(err);
+}
/**
This function writes a table map to the binary log.
Note that in order to keep the signature uniform with related methods,
we use a redundant parameter to indicate whether a transactional table
was changed or not.
+
+ If with_annotate != NULL and
+ *with_annotate = TRUE write also Annotate_rows before the table map.
@param table a pointer to the table.
@param is_transactional @c true indicates a transactional table,
@@ -4667,7 +4806,8 @@ void THD::binlog_set_stmt_begin() {
@return
nonzero if an error pops up when writing the table map event.
*/
-int THD::binlog_write_table_map(TABLE *table, bool is_transactional)
+int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
+ my_bool *with_annotate)
{
int error;
DBUG_ENTER("THD::binlog_write_table_map");
@@ -4690,6 +4830,14 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional)
IO_CACHE *file=
cache_mngr->get_binlog_cache_log(use_trans_cache(this, is_transactional));
+ if (with_annotate && *with_annotate)
+ {
+ Annotate_rows_log_event anno(current_thd, is_transactional);
+ /* Annotate event should be written not more than once */
+ *with_annotate= 0;
+ if ((error= anno.write(file)))
+ DBUG_RETURN(error);
+ }
if ((error= the_event.write(file)))
DBUG_RETURN(error);
@@ -4844,10 +4992,12 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
}
/**
- Write an event to the binary log.
+ Write an event to the binary log. If with_annotate != NULL and
+ *with_annotate = TRUE write also Annotate_rows before the event
+ (this should happen only if the event is a Table_map).
*/
-bool MYSQL_BIN_LOG::write(Log_event *event_info)
+bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
{
THD *thd= event_info->thd;
bool error= 1;
@@ -4885,6 +5035,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
*/
if (likely(is_open()))
{
+ my_off_t UNINIT_VAR(my_org_b_tell);
#ifdef HAVE_REPLICATION
/*
In the future we need to add to the following if tests like
@@ -4892,7 +5043,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
binlog_[wild_]{do|ignore}_table?" (WL#1049)"
*/
const char *local_db= event_info->get_db();
- if ((thd && !(thd->variables.option_bits & OPTION_BIN_LOG)) ||
+ if ((!(thd->variables.option_bits & OPTION_BIN_LOG)) ||
(thd->lex->sql_command != SQLCOM_ROLLBACK_TO_SAVEPOINT &&
thd->lex->sql_command != SQLCOM_SAVEPOINT &&
!binlog_filter->db_ok(local_db)))
@@ -4904,6 +5055,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
if (event_info->use_direct_logging())
{
file= &log_file;
+ my_org_b_tell= my_b_tell(file);
mysql_mutex_lock(&LOCK_log);
}
else
@@ -4933,10 +5085,22 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
of the SQL command. If row-based binlogging, Insert_id, Rand
and other kind of "setting context" events are not needed.
*/
+
+ if (with_annotate && *with_annotate)
+ {
+ DBUG_ASSERT(event_info->get_type_code() == TABLE_MAP_EVENT);
+ Annotate_rows_log_event anno(thd, event_info->cache_type);
+ /* Annotate event should be written not more than once */
+ *with_annotate= 0;
+ if (anno.write(file))
+ goto err;
+ }
+
if (thd)
{
if (!thd->is_current_stmt_binlog_format_row())
{
+
if (thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt)
{
Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT,
@@ -4998,12 +5162,18 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
err:
if (event_info->use_direct_logging())
{
+ my_off_t offset= my_b_tell(file);
+
if (!error)
{
bool synced;
+
if ((error= flush_and_sync(&synced)))
goto unlock;
+ status_var_add(thd->status_var.binlog_bytes_written,
+ offset - my_org_b_tell);
+
if ((error= RUN_HOOK(binlog_storage, after_flush,
(thd, log_file_name, file->pos_in_file, synced))))
{
@@ -5013,7 +5183,15 @@ err:
signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
}
+
unlock:
+ /*
+ Take mutex to protect against a reader seeing partial writes of 64-bit
+ offset on 32-bit CPUs.
+ */
+ mysql_mutex_lock(&LOCK_commit_ordered);
+ last_commit_pos_offset= offset;
+ mysql_mutex_unlock(&LOCK_commit_ordered);
mysql_mutex_unlock(&LOCK_log);
}
@@ -5130,12 +5308,14 @@ int MYSQL_BIN_LOG::rotate_and_purge(uint flags)
We give it a shot and try to write an incident event anyway
to the current log.
*/
- if (!write_incident(current_thd, FALSE))
+ if (!write_incident_already_locked(current_thd))
flush_and_sync(0);
#ifdef HAVE_REPLICATION
check_purge= true;
#endif
+ if (flags & RP_BINLOG_CHECKSUM_ALG_CHANGE)
+ checksum_alg_reset= BINLOG_CHECKSUM_ALG_UNDEF; // done
}
if (!(flags & RP_LOCK_LOG_IS_ALREADY_LOCKED))
mysql_mutex_unlock(&LOCK_log);
@@ -5164,29 +5344,67 @@ uint MYSQL_BIN_LOG::next_file_id()
}
+/**
+ Calculate checksum of possibly a part of an event containing at least
+ the whole common header.
+
+ @param buf the pointer to trans cache's buffer
+ @param off the offset of the beginning of the event in the buffer
+ @param event_len no-checksum length of the event
+ @param length the current size of the buffer
+
+ @param crc [in-out] the checksum
+
+ Event size in incremented by @c BINLOG_CHECKSUM_LEN.
+
+ @return 0 or number of unprocessed yet bytes of the event excluding
+ the checksum part.
+*/
+ static ulong fix_log_event_crc(uchar *buf, uint off, uint event_len,
+ uint length, ha_checksum *crc)
+{
+ ulong ret;
+ uchar *event_begin= buf + off;
+
+ ret= length >= off + event_len ? 0 : off + event_len - length;
+ *crc= my_checksum(*crc, event_begin, event_len - ret);
+ return ret;
+}
+
/*
Write the contents of a cache to the binary log.
SYNOPSIS
write_cache()
+ thd Current_thread
cache Cache to write to the binary log
- lock_log True if the LOCK_log mutex should be aquired, false otherwise
- sync_log True if the log should be flushed and synced
DESCRIPTION
Write the contents of the cache to the binary log. The cache will
be reset as a READ_CACHE to be able to read the contents from it.
- */
-int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log)
-{
- Mutex_sentry sentry(lock_log ? &LOCK_log : NULL);
+ Reading from the trans cache with possible (per @c binlog_checksum_options)
+ adding checksum value and then fixing the length and the end_log_pos of
+ events prior to fill in the binlog cache.
+*/
+int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
+{
+ mysql_mutex_assert_owner(&LOCK_log);
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
return ER_ERROR_ON_WRITE;
uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
+ ulong remains= 0; // part of unprocessed yet netto length of the event
long val;
+ ulong end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t
uchar header[LOG_EVENT_HEADER_LEN];
+ ha_checksum crc= 0, crc_0= 0; // assignments to keep compiler happy
+ my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF);
+ uchar buf[BINLOG_CHECKSUM_LEN];
+
+ // while there is just one alg the following must hold:
+ DBUG_ASSERT(!do_checksum ||
+ binlog_checksum_options == BINLOG_CHECKSUM_ALG_CRC32);
/*
The events in the buffer have incorrect end_log_pos data
@@ -5204,6 +5422,8 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log)
group= (uint)my_b_tell(&log_file);
hdr_offs= carry= 0;
+ if (do_checksum)
+ crc= crc_0= my_checksum(0L, NULL, 0);
do
{
@@ -5216,25 +5436,44 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log)
DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN);
/* assemble both halves */
- memcpy(&header[carry], (char *)cache->read_pos, LOG_EVENT_HEADER_LEN - carry);
+ memcpy(&header[carry], (char *)cache->read_pos,
+ LOG_EVENT_HEADER_LEN - carry);
/* fix end_log_pos */
- val= uint4korr(&header[LOG_POS_OFFSET]) + group;
+ val= uint4korr(&header[LOG_POS_OFFSET]) + group +
+ (end_log_pos_inc+= (do_checksum ? BINLOG_CHECKSUM_LEN : 0));
int4store(&header[LOG_POS_OFFSET], val);
+ if (do_checksum)
+ {
+ ulong len= uint4korr(&header[EVENT_LEN_OFFSET]);
+ /* fix len */
+ int4store(&header[EVENT_LEN_OFFSET], len + BINLOG_CHECKSUM_LEN);
+ }
+
/* write the first half of the split header */
if (my_b_write(&log_file, header, carry))
return ER_ERROR_ON_WRITE;
+ status_var_add(thd->status_var.binlog_bytes_written, carry);
/*
copy fixed second half of header to cache so the correct
version will be written later.
*/
- memcpy((char *)cache->read_pos, &header[carry], LOG_EVENT_HEADER_LEN - carry);
+ memcpy((char *)cache->read_pos, &header[carry],
+ LOG_EVENT_HEADER_LEN - carry);
/* next event header at ... */
- hdr_offs = uint4korr(&header[EVENT_LEN_OFFSET]) - carry;
+ hdr_offs= uint4korr(&header[EVENT_LEN_OFFSET]) - carry -
+ (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
+ if (do_checksum)
+ {
+ DBUG_ASSERT(crc == crc_0 && remains == 0);
+ crc= my_checksum(crc, header, carry);
+ remains= uint4korr(header + EVENT_LEN_OFFSET) - carry -
+ BINLOG_CHECKSUM_LEN;
+ }
carry= 0;
}
@@ -5249,6 +5488,25 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log)
very next iteration, just "eventually").
*/
+ /* crc-calc the whole buffer */
+ if (do_checksum && hdr_offs >= length)
+ {
+
+ DBUG_ASSERT(remains != 0 && crc != crc_0);
+
+ crc= my_checksum(crc, cache->read_pos, length);
+ remains -= length;
+ if (my_b_write(&log_file, cache->read_pos, length))
+ return ER_ERROR_ON_WRITE;
+ if (remains == 0)
+ {
+ int4store(buf, crc);
+ if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
+ return ER_ERROR_ON_WRITE;
+ crc= crc_0;
+ }
+ }
+
while (hdr_offs < length)
{
/*
@@ -5256,6 +5514,26 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log)
we get the rest.
*/
+ if (do_checksum)
+ {
+ if (remains != 0)
+ {
+ /*
+ finish off with remains of the last event that crawls
+ from previous into the current buffer
+ */
+ DBUG_ASSERT(crc != crc_0);
+ crc= my_checksum(crc, cache->read_pos, hdr_offs);
+ int4store(buf, crc);
+ remains -= hdr_offs;
+ DBUG_ASSERT(remains == 0);
+ if (my_b_write(&log_file, cache->read_pos, hdr_offs) ||
+ my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
+ return ER_ERROR_ON_WRITE;
+ crc= crc_0;
+ }
+ }
+
if (hdr_offs + LOG_EVENT_HEADER_LEN > length)
{
carry= length - hdr_offs;
@@ -5265,17 +5543,38 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log)
else
{
/* we've got a full event-header, and it came in one piece */
-
- uchar *log_pos= (uchar *)cache->read_pos + hdr_offs + LOG_POS_OFFSET;
+ uchar *ev= (uchar *)cache->read_pos + hdr_offs;
+ uint event_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len
+ uchar *log_pos= ev + LOG_POS_OFFSET;
/* fix end_log_pos */
- val= uint4korr(log_pos) + group;
+ val= uint4korr(log_pos) + group +
+ (end_log_pos_inc += (do_checksum ? BINLOG_CHECKSUM_LEN : 0));
int4store(log_pos, val);
+ /* fix CRC */
+ if (do_checksum)
+ {
+ /* fix length */
+ int4store(ev + EVENT_LEN_OFFSET, event_len + BINLOG_CHECKSUM_LEN);
+ remains= fix_log_event_crc(cache->read_pos, hdr_offs, event_len,
+ length, &crc);
+ if (my_b_write(&log_file, ev,
+ remains == 0 ? event_len : length - hdr_offs))
+ return ER_ERROR_ON_WRITE;
+ if (remains == 0)
+ {
+ int4store(buf, crc);
+ if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
+ return ER_ERROR_ON_WRITE;
+ crc= crc_0; // crc is complete
+ }
+ }
+
/* next event header at ... */
- log_pos= (uchar *)cache->read_pos + hdr_offs + EVENT_LEN_OFFSET;
- hdr_offs += uint4korr(log_pos);
+ hdr_offs += event_len; // incr by the netto len
+ DBUG_ASSERT(!do_checksum || remains == 0 || hdr_offs >= length);
}
}
@@ -5291,15 +5590,19 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log)
}
/* Write data to the binary log file */
- if (my_b_write(&log_file, cache->read_pos, length))
- return ER_ERROR_ON_WRITE;
+ DBUG_EXECUTE_IF("fail_binlog_write_1",
+ errno= 28; return ER_ERROR_ON_WRITE;);
+ if (!do_checksum)
+ if (my_b_write(&log_file, cache->read_pos, length))
+ return ER_ERROR_ON_WRITE;
+ status_var_add(thd->status_var.binlog_bytes_written, length);
+
cache->read_pos=cache->read_end; // Mark buffer used up
} while ((length= my_b_fill(cache)));
DBUG_ASSERT(carry == 0);
-
- if (sync_log)
- return flush_and_sync(0);
+ DBUG_ASSERT(!do_checksum || remains == 0);
+ DBUG_ASSERT(!do_checksum || crc == crc_0);
return 0; // All OK
}
@@ -5333,30 +5636,50 @@ int query_error_code(THD *thd, bool not_killed)
return error;
}
-bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock)
+
+bool MYSQL_BIN_LOG::write_incident_already_locked(THD *thd)
{
uint error= 0;
- DBUG_ENTER("MYSQL_BIN_LOG::write_incident");
-
- if (!is_open())
- DBUG_RETURN(error);
-
- LEX_STRING const write_error_msg=
- { C_STRING_WITH_LEN("error writing to the binary log") };
+ DBUG_ENTER("MYSQL_BIN_LOG::write_incident_already_locked");
Incident incident= INCIDENT_LOST_EVENTS;
Incident_log_event ev(thd, incident, write_error_msg);
- if (lock)
- mysql_mutex_lock(&LOCK_log);
- error= ev.write(&log_file);
- if (lock)
+
+ if (likely(is_open()))
+ {
+ error= ev.write(&log_file);
+ status_var_add(thd->status_var.binlog_bytes_written, ev.data_written);
+ }
+
+ DBUG_RETURN(error);
+}
+
+
+bool MYSQL_BIN_LOG::write_incident(THD *thd)
+{
+ uint error= 0;
+ my_off_t offset;
+ DBUG_ENTER("MYSQL_BIN_LOG::write_incident");
+
+ mysql_mutex_lock(&LOCK_log);
+ if (likely(is_open()))
{
- if (!error && !(error= flush_and_sync(0)))
+ if (!(error= write_incident_already_locked(thd)) &&
+ !(error= flush_and_sync(0)))
{
signal_update();
error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
}
- mysql_mutex_unlock(&LOCK_log);
+ offset= my_b_tell(&log_file);
+ /*
+ Take mutex to protect against a reader seeing partial writes of 64-bit
+ offset on 32-bit CPUs.
+ */
+ mysql_mutex_lock(&LOCK_commit_ordered);
+ last_commit_pos_offset= offset;
+ mysql_mutex_unlock(&LOCK_commit_ordered);
}
+ mysql_mutex_unlock(&LOCK_log);
+
DBUG_RETURN(error);
}
@@ -5384,102 +5707,425 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock)
'cache' needs to be reinitialized after this functions returns.
*/
-bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event,
- bool incident)
+bool
+MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
+ binlog_cache_mngr *cache_mngr,
+ Log_event *end_ev, bool all,
+ bool using_stmt_cache,
+ bool using_trx_cache)
+{
+ group_commit_entry entry;
+ DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog");
+
+ entry.thd= thd;
+ entry.cache_mngr= cache_mngr;
+ entry.error= 0;
+ entry.all= all;
+ entry.using_stmt_cache= using_stmt_cache;
+ entry.using_trx_cache= using_trx_cache;
+
+ /*
+ Log "BEGIN" at the beginning of every transaction. Here, a transaction is
+ either a BEGIN..COMMIT block or a single statement in autocommit mode.
+
+ Create the necessary events here, where we have the correct THD (and
+ thread context).
+
+ Due to group commit the actual writing to binlog may happen in a different
+ thread.
+ */
+ Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), using_trx_cache, TRUE,
+ TRUE, 0);
+ entry.begin_event= &qinfo;
+ entry.end_event= end_ev;
+ if (cache_mngr->stmt_cache.has_incident() ||
+ cache_mngr->trx_cache.has_incident())
+ {
+ Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg);
+ entry.incident_event= &inc_ev;
+ DBUG_RETURN(write_transaction_to_binlog_events(&entry));
+ }
+ else
+ {
+ entry.incident_event= NULL;
+ DBUG_RETURN(write_transaction_to_binlog_events(&entry));
+ }
+}
+
+bool
+MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
+{
+ /*
+ To facilitate group commit for the binlog, we first queue up ourselves in
+ the group commit queue. Then the first thread to enter the queue waits for
+ the LOCK_log mutex, and commits for everyone in the queue once it gets the
+ lock. Any other threads in the queue just wait for the first one to finish
+ the commit and wake them up.
+ */
+
+ entry->thd->clear_wakeup_ready();
+ mysql_mutex_lock(&LOCK_prepare_ordered);
+ group_commit_entry *orig_queue= group_commit_queue;
+ entry->next= orig_queue;
+ group_commit_queue= entry;
+
+ if (entry->cache_mngr->using_xa)
+ {
+ DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered");
+ run_prepare_ordered(entry->thd, entry->all);
+ DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered");
+ }
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
+ DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered");
+
+ /*
+ The first in the queue handle group commit for all; the others just wait
+ to be signalled when group commit is done.
+ */
+ if (orig_queue != NULL)
+ entry->thd->wait_for_wakeup_ready();
+ else
+ trx_group_commit_leader(entry);
+
+ if (!opt_optimize_thread_scheduling)
+ {
+ /* For the leader, trx_group_commit_leader() already took the lock. */
+ if (orig_queue != NULL)
+ mysql_mutex_lock(&LOCK_commit_ordered);
+
+ DEBUG_SYNC(entry->thd, "commit_loop_entry_commit_ordered");
+ ++num_commits;
+ if (entry->cache_mngr->using_xa && !entry->error)
+ run_commit_ordered(entry->thd, entry->all);
+
+ group_commit_entry *next= entry->next;
+ if (!next)
+ {
+ group_commit_queue_busy= FALSE;
+ mysql_cond_signal(&COND_queue_busy);
+ DEBUG_SYNC(entry->thd, "commit_after_group_run_commit_ordered");
+ }
+ mysql_mutex_unlock(&LOCK_commit_ordered);
+
+ if (next)
+ {
+ next->thd->signal_wakeup_ready();
+ }
+ }
+
+ if (likely(!entry->error))
+ return 0;
+
+ switch (entry->error)
+ {
+ case ER_ERROR_ON_WRITE:
+ my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, entry->commit_errno);
+ break;
+ case ER_ERROR_ON_READ:
+ my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH),
+ entry->error_cache->file_name, entry->commit_errno);
+ break;
+ default:
+ /*
+ There are not (and should not be) any errors thrown not covered above.
+ But just in case one is added later without updating the above switch
+ statement, include a catch-all.
+ */
+ my_printf_error(entry->error,
+ "Error writing transaction to binary log: %d",
+ MYF(ME_NOREFRESH), entry->error);
+ }
+
+ /*
+ Since we return error, this transaction XID will not be committed, so
+ we need to mark it as not needed for recovery (unlog() is not called
+ for a transaction if log_xid() fails).
+ */
+ if (entry->cache_mngr->using_xa && entry->cache_mngr->xa_xid)
+ mark_xid_done();
+
+ return 1;
+}
+
+/*
+ Do binlog group commit as the lead thread.
+
+ This must be called when this statement/transaction is queued at the start of
+ the group_commit_queue. It will wait to obtain the LOCK_log mutex, then group
+ commit all the transactions in the queue (more may have entered while waiting
+ for LOCK_log). After commit is done, all other threads in the queue will be
+ signalled.
+
+ */
+void
+MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
{
- DBUG_ENTER("MYSQL_BIN_LOG::write(THD *, IO_CACHE *, Log_event *)");
+ uint xid_count= 0;
+ my_off_t commit_offset;
+ group_commit_entry *current;
+ group_commit_entry *last_in_queue;
+ DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader");
+ LINT_INIT(commit_offset);
+
+ /*
+ Lock the LOCK_log(), and once we get it, collect any additional writes
+ that queued up while we were waiting.
+ */
mysql_mutex_lock(&LOCK_log);
+ DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log");
+ mysql_mutex_lock(&LOCK_prepare_ordered);
+ current= group_commit_queue;
+ group_commit_queue= NULL;
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
+
+ /* As the queue is in reverse order of entering, reverse it. */
+ group_commit_entry *queue= NULL;
+ last_in_queue= current;
+ while (current)
+ {
+ group_commit_entry *next= current->next;
+ current->next= queue;
+ queue= current;
+ current= next;
+ }
+ DBUG_ASSERT(leader == queue /* the leader should be first in queue */);
+
+ /* Now we have in queue the list of transactions to be committed in order. */
DBUG_ASSERT(is_open());
if (likely(is_open())) // Should always be true
{
/*
- We only bother to write to the binary log if there is anything
- to write.
- */
- if (my_b_tell(cache) > 0)
+ Commit every transaction in the queue.
+
+ Note that we are doing this in a different thread than the one running
+ the transaction! So we are limited in the operations we can do. In
+ particular, we cannot call my_error() on behalf of a transaction, as
+ that obtains the THD from thread local storage. Instead, we must set
+ current->error and let the thread do the error reporting itself once
+ we wake it up.
+ */
+ for (current= queue; current != NULL; current= current->next)
{
+ binlog_cache_mngr *cache_mngr= current->cache_mngr;
+
/*
- Log "BEGIN" at the beginning of every transaction. Here, a
- transaction is either a BEGIN..COMMIT block or a single
- statement in autocommit mode.
+ We already checked before that at least one cache is non-empty; if both
+ are empty we would have skipped calling into here.
*/
- Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE, TRUE, 0);
- if (qinfo.write(&log_file))
- goto err;
- DBUG_EXECUTE_IF("crash_before_writing_xid",
- {
- if ((write_error= write_cache(cache, false, true)))
- DBUG_PRINT("info", ("error writing binlog cache: %d",
- write_error));
- DBUG_PRINT("info", ("crashing before writing xid"));
- DBUG_SUICIDE();
- });
-
- if ((write_error= write_cache(cache, false, false)))
- goto err;
+ DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty());
- if (commit_event && commit_event->write(&log_file))
- goto err;
+ current->error= write_transaction_or_stmt(current);
- if (incident && write_incident(thd, FALSE))
- goto err;
+ strmake(cache_mngr->last_commit_pos_file, log_file_name,
+ sizeof(cache_mngr->last_commit_pos_file)-1);
+ commit_offset= my_b_write_tell(&log_file);
+ cache_mngr->last_commit_pos_offset= commit_offset;
+ if (cache_mngr->using_xa && cache_mngr->xa_xid)
+ xid_count++;
+ }
- bool synced= 0;
- if (flush_and_sync(&synced))
- goto err;
- DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_SUICIDE(););
- if (cache->error) // Error on read
+ bool synced= 0;
+ if (flush_and_sync(&synced))
+ {
+ for (current= queue; current != NULL; current= current->next)
{
- sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno);
- write_error=1; // Don't give more errors
- goto err;
+ if (!current->error)
+ {
+ current->error= ER_ERROR_ON_WRITE;
+ current->commit_errno= errno;
+ current->error_cache= NULL;
+ }
}
-
- if (RUN_HOOK(binlog_storage, after_flush,
- (thd, log_file_name, log_file.pos_in_file, synced)))
+ }
+ else
+ {
+ bool any_error= false;
+ bool all_error= true;
+ for (current= queue; current != NULL; current= current->next)
{
- sql_print_error("Failed to run 'after_flush' hooks");
- write_error=1;
- goto err;
+ if (!current->error &&
+ RUN_HOOK(binlog_storage, after_flush,
+ (current->thd, log_file_name, log_file.pos_in_file, synced)))
+ {
+ current->error= ER_ERROR_ON_WRITE;
+ current->commit_errno= -1;
+ current->error_cache= NULL;
+ any_error= true;
+ }
+ else
+ all_error= false;
}
- signal_update();
+ if (any_error)
+ sql_print_error("Failed to run 'after_flush' hooks");
+ if (!all_error)
+ signal_update();
}
/*
- if commit_event is Xid_log_event, increase the number of
- prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated
+ if any commit_events are Xid_log_event, increase the number of
+ prepared_xids (it's decreased in ::unlog()). Binlog cannot be rotated
if there're prepared xids in it - see the comment in new_file() for
an explanation.
- If the commit_event is not Xid_log_event (then it's a Query_log_event)
- rotate binlog, if necessary.
+ If no Xid_log_events (then it's all Query_log_event) rotate binlog,
+ if necessary.
*/
- if (commit_event && commit_event->get_type_code() == XID_EVENT)
+ if (xid_count > 0)
{
- mysql_mutex_lock(&LOCK_prep_xids);
- prepared_xids++;
- mysql_mutex_unlock(&LOCK_prep_xids);
+ mark_xids_active(xid_count);
}
else
+ {
if (rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED))
- goto err;
+ {
+ /*
+ If we fail to rotate, which thread should get the error?
+ We give the error to the *last* transaction thread; that seems to
+ make the most sense, as it was the last to write to the log.
+ */
+ last_in_queue->error= ER_ERROR_ON_WRITE;
+ last_in_queue->commit_errno= errno;
+ }
+ }
}
+
+ DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered");
+ mysql_mutex_lock(&LOCK_commit_ordered);
+ last_commit_pos_offset= commit_offset;
+ /*
+ We cannot unlock LOCK_log until we have locked LOCK_commit_ordered;
+ otherwise scheduling could allow the next group commit to run ahead of us,
+ messing up the order of commit_ordered() calls. But as soon as
+ LOCK_commit_ordered is obtained, we can let the next group commit start.
+ */
mysql_mutex_unlock(&LOCK_log);
+ DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log");
+ ++num_group_commits;
- DBUG_RETURN(0);
+ if (!opt_optimize_thread_scheduling)
+ {
+ /*
+ If we want to run commit_ordered() each in the transaction's own thread
+ context, then we need to mark the queue reserved; we need to finish all
+ threads in one group commit before the next group commit can be allowed
+ to proceed, and we cannot unlock a simple pthreads mutex in a different
+ thread from the one that locked it.
+ */
-err:
- if (!write_error)
+ while (group_commit_queue_busy)
+ mysql_cond_wait(&COND_queue_busy, &LOCK_commit_ordered);
+ group_commit_queue_busy= TRUE;
+
+ /* Note that we return with LOCK_commit_ordered locked! */
+ DBUG_VOID_RETURN;
+ }
+
+ /*
+ Wakeup each participant waiting for our group commit, first calling the
+ commit_ordered() methods for any transactions doing 2-phase commit.
+ */
+ current= queue;
+ while (current != NULL)
{
- write_error= 1;
- sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
+ group_commit_entry *next;
+
+ DEBUG_SYNC(leader->thd, "commit_loop_entry_commit_ordered");
+ ++num_commits;
+ if (current->cache_mngr->using_xa && !current->error)
+ run_commit_ordered(current->thd, current->all);
+
+ /*
+ Careful not to access current->next after waking up the other thread! As
+ it may change immediately after wakeup.
+ */
+ next= current->next;
+ if (current != leader) // Don't wake up ourself
+ current->thd->signal_wakeup_ready();
+ current= next;
}
- mysql_mutex_unlock(&LOCK_log);
- DBUG_RETURN(1);
+ DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered");
+ mysql_mutex_unlock(&LOCK_commit_ordered);
+
+ DBUG_VOID_RETURN;
}
+int
+MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry)
+{
+ binlog_cache_mngr *mngr= entry->cache_mngr;
+
+ if (entry->begin_event->write(&log_file))
+ return ER_ERROR_ON_WRITE;
+ status_var_add(entry->thd->status_var.binlog_bytes_written,
+ entry->begin_event->data_written);
+
+ if (entry->using_stmt_cache && !mngr->stmt_cache.empty() &&
+ write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE)))
+ {
+ entry->error_cache= &mngr->stmt_cache.cache_log;
+ entry->commit_errno= errno;
+ return ER_ERROR_ON_WRITE;
+ }
+
+ if (entry->using_trx_cache && !mngr->trx_cache.empty())
+ {
+ DBUG_EXECUTE_IF("crash_before_writing_xid",
+ {
+ if ((write_cache(entry->thd,
+ mngr->get_binlog_cache_log(TRUE))))
+ DBUG_PRINT("info", ("error writing binlog cache"));
+ else
+ flush_and_sync(0);
+
+ DBUG_PRINT("info", ("crashing before writing xid"));
+ DBUG_SUICIDE();
+ });
+
+ if (write_cache(entry->thd, mngr->get_binlog_cache_log(TRUE)))
+ {
+ entry->error_cache= &mngr->trx_cache.cache_log;
+ entry->commit_errno= errno;
+ return ER_ERROR_ON_WRITE;
+ }
+ }
+
+ if (entry->end_event->write(&log_file))
+ {
+ entry->error_cache= NULL;
+ entry->commit_errno= errno;
+ return ER_ERROR_ON_WRITE;
+ }
+ status_var_add(entry->thd->status_var.binlog_bytes_written,
+ entry->end_event->data_written);
+
+ if (entry->incident_event)
+ {
+ if (entry->incident_event->write(&log_file))
+ {
+ entry->error_cache= NULL;
+ entry->commit_errno= errno;
+ return ER_ERROR_ON_WRITE;
+ }
+ }
+
+ if (mngr->get_binlog_cache_log(FALSE)->error) // Error on read
+ {
+ entry->error_cache= &mngr->stmt_cache.cache_log;
+ entry->commit_errno= errno;
+ return ER_ERROR_ON_READ;
+ }
+ if (mngr->get_binlog_cache_log(TRUE)->error) // Error on read
+ {
+ entry->error_cache= &mngr->trx_cache.cache_log;
+ entry->commit_errno= errno;
+ return ER_ERROR_ON_READ;
+ }
+
+ return 0;
+}
+
/**
Wait until we get a signal that the relay log has been updated.
@@ -5561,6 +6207,11 @@ void MYSQL_BIN_LOG::close(uint exiting)
(exiting & LOG_CLOSE_STOP_EVENT))
{
Stop_log_event s;
+ // the checksumming rule for relay-log case is similar to Rotate
+ s.checksum_alg= is_relay_log ?
+ (uint8) relay_log_checksum_alg : (uint8) binlog_checksum_options;
+ DBUG_ASSERT(!is_relay_log ||
+ relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
s.write(&log_file);
bytes_written+= s.data_written;
signal_update();
@@ -5677,9 +6328,23 @@ static bool test_if_number(register const char *str,
void sql_perror(const char *message)
{
-#ifdef HAVE_STRERROR
+#if defined(_WIN32)
+ char* buf;
+ DWORD dw= GetLastError();
+ if (FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
+ FORMAT_MESSAGE_IGNORE_INSERTS, NULL, dw,
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPSTR)&buf, 0, NULL ) > 0)
+ {
+ sql_print_error("%s: %s",message, buf);
+ LocalFree((HLOCAL)buf);
+ }
+ else
+ {
+ sql_print_error("%s", message);
+ }
+#elif defined(HAVE_STRERROR)
sql_print_error("%s: %s",message, strerror(errno));
-#else
+#else
perror(message);
#endif
}
@@ -5884,42 +6549,184 @@ void sql_print_information(const char *format, ...)
}
+void
+TC_LOG::run_prepare_ordered(THD *thd, bool all)
+{
+ Ha_trx_info *ha_info=
+ all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list;
+
+ mysql_mutex_assert_owner(&LOCK_prepare_ordered);
+ for (; ha_info; ha_info= ha_info->next())
+ {
+ handlerton *ht= ha_info->ht();
+ if (!ht->prepare_ordered)
+ continue;
+ ht->prepare_ordered(ht, thd, all);
+ }
+}
+
+
+void
+TC_LOG::run_commit_ordered(THD *thd, bool all)
+{
+ Ha_trx_info *ha_info=
+ all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list;
+
+ mysql_mutex_assert_owner(&LOCK_commit_ordered);
+ for (; ha_info; ha_info= ha_info->next())
+ {
+ handlerton *ht= ha_info->ht();
+ if (!ht->commit_ordered)
+ continue;
+ ht->commit_ordered(ht, thd, all);
+ DEBUG_SYNC(thd, "commit_after_run_commit_ordered");
+ }
+}
+
+
+int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
+ bool need_prepare_ordered,
+ bool need_commit_ordered)
+{
+ int cookie;
+ struct commit_entry entry;
+ bool is_group_commit_leader;
+ LINT_INIT(is_group_commit_leader);
+
+ if (need_prepare_ordered)
+ {
+ mysql_mutex_lock(&LOCK_prepare_ordered);
+ run_prepare_ordered(thd, all);
+ if (need_commit_ordered)
+ {
+ /*
+ Must put us in queue so we can run_commit_ordered() in same sequence
+ as we did run_prepare_ordered().
+ */
+ thd->clear_wakeup_ready();
+ entry.thd= thd;
+ commit_entry *previous_queue= commit_ordered_queue;
+ entry.next= previous_queue;
+ commit_ordered_queue= &entry;
+ is_group_commit_leader= (previous_queue == NULL);
+ }
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
+ }
+
+ cookie= 0;
+ if (xid)
+ cookie= log_one_transaction(xid);
+
+ if (need_commit_ordered)
+ {
+ if (need_prepare_ordered)
+ {
+ /*
+ We did the run_prepare_ordered() serialised, then ran the log_xid() in
+ parallel. Now we have to do run_commit_ordered() serialised in the
+ same sequence as run_prepare_ordered().
+
+ We do this starting from the head of the queue, each thread doing
+ run_commit_ordered() and signalling the next in queue.
+ */
+ if (is_group_commit_leader)
+ {
+ /* The first in queue starts the ball rolling. */
+ mysql_mutex_lock(&LOCK_prepare_ordered);
+ while (commit_ordered_queue_busy)
+ mysql_cond_wait(&COND_queue_busy, &LOCK_prepare_ordered);
+ commit_entry *queue= commit_ordered_queue;
+ commit_ordered_queue= NULL;
+ /*
+ Mark the queue busy while we bounce it from one thread to the
+ next.
+ */
+ commit_ordered_queue_busy= true;
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
+
+ /* Reverse the queue list so we get correct order. */
+ commit_entry *prev= NULL;
+ while (queue)
+ {
+ commit_entry *next= queue->next;
+ queue->next= prev;
+ prev= queue;
+ queue= next;
+ }
+ DBUG_ASSERT(prev == &entry && prev->thd == thd);
+ }
+ else
+ {
+ /* Not first in queue; just wait until previous thread wakes us up. */
+ thd->wait_for_wakeup_ready();
+ }
+ }
+
+ /* Only run commit_ordered() if log_xid was successful. */
+ if (cookie)
+ {
+ mysql_mutex_lock(&LOCK_commit_ordered);
+ run_commit_ordered(thd, all);
+ mysql_mutex_unlock(&LOCK_commit_ordered);
+ }
+
+ if (need_prepare_ordered)
+ {
+ commit_entry *next= entry.next;
+ if (next)
+ {
+ next->thd->signal_wakeup_ready();
+ }
+ else
+ {
+ mysql_mutex_lock(&LOCK_prepare_ordered);
+ commit_ordered_queue_busy= false;
+ mysql_cond_signal(&COND_queue_busy);
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
+ }
+ }
+ }
+
+ return cookie;
+}
+
+
/********* transaction coordinator log for 2pc - mmap() based solution *******/
/*
- the log consists of a file, mmapped to a memory.
- file is divided on pages of tc_log_page_size size.
- (usable size of the first page is smaller because of log header)
- there's PAGE control structure for each page
- each page (or rather PAGE control structure) can be in one of three
- states - active, syncing, pool.
- there could be only one page in active or syncing states,
- but many in pool - pool is fifo queue.
- usual lifecycle of a page is pool->active->syncing->pool
- "active" page - is a page where new xid's are logged.
- the page stays active as long as syncing slot is taken.
- "syncing" page is being synced to disk. no new xid can be added to it.
- when the sync is done the page is moved to a pool and an active page
+ the log consists of a file, mapped to memory.
+ file is divided into pages of tc_log_page_size size.
+ (usable size of the first page is smaller because of the log header)
+ there is a PAGE control structure for each page
+ each page (or rather its PAGE control structure) can be in one of
+ the three states - active, syncing, pool.
+ there could be only one page in the active or syncing state,
+ but many in pool - pool is a fifo queue.
+ the usual lifecycle of a page is pool->active->syncing->pool.
+ the "active" page is a page where new xid's are logged.
+ the page stays active as long as the syncing slot is taken.
+ the "syncing" page is being synced to disk. no new xid can be added to it.
+ when the syncing is done the page is moved to a pool and an active page
becomes "syncing".
the result of such an architecture is a natural "commit grouping" -
If commits are coming faster than the system can sync, they do not
- stall. Instead, all commit that came since the last sync are
- logged to the same page, and they all are synced with the next -
+ stall. Instead, all commits that came since the last sync are
+ logged to the same "active" page, and they all are synced with the next -
one - sync. Thus, thought individual commits are delayed, throughput
is not decreasing.
- when a xid is added to an active page, the thread of this xid waits
+ when an xid is added to an active page, the thread of this xid waits
for a page's condition until the page is synced. when syncing slot
becomes vacant one of these waiters is awaken to take care of syncing.
it syncs the page and signals all waiters that the page is synced.
PAGE::waiters is used to count these waiters, and a page may never
become active again until waiters==0 (that is all waiters from the
- previous sync have noticed the sync was completed)
+ previous sync have noticed that the sync was completed)
note, that the page becomes "dirty" and has to be synced only when a
new xid is added into it. Removing a xid from a page does not make it
- dirty - we don't sync removals to disk.
+ dirty - we don't sync xid removals to disk.
*/
ulong tc_log_page_waits= 0;
@@ -5928,7 +6735,7 @@ ulong tc_log_page_waits= 0;
#define TC_LOG_HEADER_SIZE (sizeof(tc_log_magic)+1)
-static const char tc_log_magic[]={(char) 254, 0x23, 0x05, 0x74};
+static const uchar tc_log_magic[]={(uchar) 254, 0x23, 0x05, 0x74};
ulong opt_tc_log_size= TC_LOG_MIN_SIZE;
ulong tc_log_max_pages_used=0, tc_log_page_size=0, tc_log_cur_pages_used=0;
@@ -5986,7 +6793,8 @@ int TC_LOG_MMAP::open(const char *opt_name)
inited=2;
npages=(uint)file_length/tc_log_page_size;
- DBUG_ASSERT(npages >= 3); // to guarantee non-empty pool
+ if (npages < 3) // to guarantee non-empty pool
+ goto err;
if (!(pages=(PAGE *)my_malloc(npages*sizeof(PAGE), MYF(MY_WME|MY_ZEROFILL))))
goto err;
inited=3;
@@ -5997,9 +6805,9 @@ int TC_LOG_MMAP::open(const char *opt_name)
pg->state=PS_POOL;
mysql_mutex_init(key_PAGE_lock, &pg->lock, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_PAGE_cond, &pg->cond, 0);
- pg->start=(my_xid *)(data + i*tc_log_page_size);
- pg->end=(my_xid *)(pg->start + tc_log_page_size);
+ pg->ptr= pg->start=(my_xid *)(data + i*tc_log_page_size);
pg->size=pg->free=tc_log_page_size/sizeof(my_xid);
+ pg->end=pg->start + pg->size;
}
pages[0].size=pages[0].free=
(tc_log_page_size-TC_LOG_HEADER_SIZE)/sizeof(my_xid);
@@ -6020,6 +6828,7 @@ int TC_LOG_MMAP::open(const char *opt_name)
mysql_mutex_init(key_LOCK_pool, &LOCK_pool, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_active, &COND_active, 0);
mysql_cond_init(key_COND_pool, &COND_pool, 0);
+ mysql_cond_init(key_COND_queue_busy, &COND_queue_busy, 0);
inited=6;
@@ -6027,6 +6836,8 @@ int TC_LOG_MMAP::open(const char *opt_name)
active=pages;
pool=pages+1;
pool_last=pages+npages-1;
+ commit_ordered_queue= NULL;
+ commit_ordered_queue_busy= false;
return 0;
@@ -6043,7 +6854,7 @@ err:
-# if there're waiters - take the one with the most free space.
@todo
- TODO page merging. try to allocate adjacent page first,
+ page merging. try to allocate adjacent page first,
so that they can be flushed both in one sync
*/
@@ -6052,8 +6863,7 @@ void TC_LOG_MMAP::get_active_from_pool()
PAGE **p, **best_p=0;
int best_free;
- if (syncing)
- mysql_mutex_lock(&LOCK_pool);
+ mysql_mutex_lock(&LOCK_pool);
do
{
@@ -6073,20 +6883,21 @@ void TC_LOG_MMAP::get_active_from_pool()
}
while ((*best_p == 0 || best_free == 0) && overflow());
+ mysql_mutex_assert_owner(&LOCK_active);
active=*best_p;
- if (active->free == active->size) // we've chosen an empty page
- {
- tc_log_cur_pages_used++;
- set_if_bigger(tc_log_max_pages_used, tc_log_cur_pages_used);
- }
if ((*best_p)->next) // unlink the page from the pool
*best_p=(*best_p)->next;
else
pool_last=*best_p;
+ mysql_mutex_unlock(&LOCK_pool);
- if (syncing)
- mysql_mutex_unlock(&LOCK_pool);
+ mysql_mutex_lock(&active->lock);
+ if (active->free == active->size) // we've chosen an empty page
+ {
+ tc_log_cur_pages_used++;
+ set_if_bigger(tc_log_max_pages_used, tc_log_cur_pages_used);
+ }
}
/**
@@ -6132,7 +6943,7 @@ int TC_LOG_MMAP::overflow()
to the position in memory where xid was logged to.
*/
-int TC_LOG_MMAP::log_xid(THD *thd, my_xid xid)
+int TC_LOG_MMAP::log_one_transaction(my_xid xid)
{
int err;
PAGE *p;
@@ -6141,7 +6952,7 @@ int TC_LOG_MMAP::log_xid(THD *thd, my_xid xid)
mysql_mutex_lock(&LOCK_active);
/*
- if active page is full - just wait...
+ if the active page is full - just wait...
frankly speaking, active->free here accessed outside of mutex
protection, but it's safe, because it only means we may miss an
unlog() for the active page, and we're not waiting for it here -
@@ -6153,9 +6964,17 @@ int TC_LOG_MMAP::log_xid(THD *thd, my_xid xid)
/* no active page ? take one from the pool */
if (active == 0)
get_active_from_pool();
+ else
+ mysql_mutex_lock(&active->lock);
p=active;
- mysql_mutex_lock(&p->lock);
+
+ /*
+ p->free is always > 0 here because to decrease it one needs
+ to take p->lock and before it one needs to take LOCK_active.
+ But checked that active->free > 0 under LOCK_active and
+ haven't release it ever since
+ */
/* searching for an empty slot */
while (*p->ptr)
@@ -6169,38 +6988,51 @@ int TC_LOG_MMAP::log_xid(THD *thd, my_xid xid)
*p->ptr++= xid;
p->free--;
p->state= PS_DIRTY;
-
- /* to sync or not to sync - this is the question */
- mysql_mutex_unlock(&LOCK_active);
- mysql_mutex_lock(&LOCK_sync);
mysql_mutex_unlock(&p->lock);
+ mysql_mutex_lock(&LOCK_sync);
if (syncing)
{ // somebody's syncing. let's wait
+ mysql_mutex_unlock(&LOCK_active);
+ mysql_mutex_lock(&p->lock);
p->waiters++;
- /*
- note - it must be while (), not do ... while () here
- as p->state may be not PS_DIRTY when we come here
- */
- while (p->state == PS_DIRTY && syncing)
+ for (;;)
+ {
+ int not_dirty = p->state != PS_DIRTY;
+ mysql_mutex_unlock(&p->lock);
+ if (not_dirty || !syncing)
+ break;
mysql_cond_wait(&p->cond, &LOCK_sync);
+ mysql_mutex_lock(&p->lock);
+ }
p->waiters--;
err= p->state == PS_ERROR;
if (p->state != PS_DIRTY) // page was synced
{
- if (p->waiters == 0)
- mysql_cond_signal(&COND_pool); // in case somebody's waiting
mysql_mutex_unlock(&LOCK_sync);
+ if (p->waiters == 0)
+ mysql_cond_signal(&COND_pool); // in case somebody's waiting
+ mysql_mutex_unlock(&p->lock);
goto done; // we're done
}
- } // page was not synced! do it now
- DBUG_ASSERT(active == p && syncing == 0);
- mysql_mutex_lock(&LOCK_active);
- syncing=p; // place is vacant - take it
- active=0; // page is not active anymore
- mysql_cond_broadcast(&COND_active); // in case somebody's waiting
- mysql_mutex_unlock(&LOCK_active);
- mysql_mutex_unlock(&LOCK_sync);
+ DBUG_ASSERT(!syncing);
+ mysql_mutex_unlock(&p->lock);
+ syncing = p;
+ mysql_mutex_unlock(&LOCK_sync);
+
+ mysql_mutex_lock(&LOCK_active);
+ active=0; // page is not active anymore
+ mysql_cond_broadcast(&COND_active);
+ mysql_mutex_unlock(&LOCK_active);
+ }
+ else
+ {
+ syncing = p; // place is vacant - take it
+ mysql_mutex_unlock(&LOCK_sync);
+ active = 0; // page is not active anymore
+ mysql_cond_broadcast(&COND_active);
+ mysql_mutex_unlock(&LOCK_active);
+ }
err= sync();
done:
@@ -6217,7 +7049,7 @@ int TC_LOG_MMAP::sync()
sit down and relax - this can take a while...
note - no locks are held at this point
*/
- err= my_msync(fd, syncing->start, 1, MS_SYNC);
+ err= my_msync(fd, syncing->start, syncing->size * sizeof(my_xid), MS_SYNC);
/* page is synced. let's move it to the pool */
mysql_mutex_lock(&LOCK_pool);
@@ -6225,14 +7057,23 @@ int TC_LOG_MMAP::sync()
pool_last=syncing;
syncing->next=0;
syncing->state= err ? PS_ERROR : PS_POOL;
- mysql_cond_broadcast(&syncing->cond); // signal "sync done"
- mysql_cond_signal(&COND_pool); // in case somebody's waiting
+ mysql_cond_signal(&COND_pool); // in case somebody's waiting
mysql_mutex_unlock(&LOCK_pool);
/* marking 'syncing' slot free */
mysql_mutex_lock(&LOCK_sync);
+ mysql_cond_broadcast(&syncing->cond); // signal "sync done"
syncing=0;
- mysql_cond_signal(&active->cond); // wake up a new syncer
+ /*
+ we check the "active" pointer without LOCK_active. Still, it's safe -
+ "active" can change from NULL to not NULL any time, but it
+ will take LOCK_sync before waiting on active->cond. That is, it can never
+ miss a signal.
+ And "active" can change to NULL only by the syncing thread
+ (the thread that will send a signal below)
+ */
+ if (active)
+ mysql_cond_signal(&active->cond); // wake up a new syncer
mysql_mutex_unlock(&LOCK_sync);
return err;
}
@@ -6249,13 +7090,13 @@ int TC_LOG_MMAP::unlog(ulong cookie, my_xid xid)
DBUG_ASSERT(*x == xid);
DBUG_ASSERT(x >= p->start && x < p->end);
- *x=0;
mysql_mutex_lock(&p->lock);
+ *x=0;
p->free++;
DBUG_ASSERT(p->free <= p->size);
set_if_smaller(p->ptr, x);
- if (p->free == p->size) // the page is completely empty
+ if (p->free == p->size) // the page is completely empty
statistic_decrement(tc_log_cur_pages_used, &LOCK_status);
if (p->waiters == 0) // the page is in pool and ready to rock
mysql_cond_signal(&COND_pool); // ping ... for overflow()
@@ -6272,6 +7113,8 @@ void TC_LOG_MMAP::close()
mysql_mutex_destroy(&LOCK_active);
mysql_mutex_destroy(&LOCK_pool);
mysql_cond_destroy(&COND_pool);
+ mysql_cond_destroy(&COND_active);
+ mysql_cond_destroy(&COND_queue_busy);
case 5:
data[0]='A'; // garble the first (signature) byte, in case mysql_file_delete fails
case 4:
@@ -6299,7 +7142,7 @@ int TC_LOG_MMAP::recover()
HASH xids;
PAGE *p=pages, *end_p=pages+npages;
- if (memcmp(data, tc_log_magic, sizeof(tc_log_magic)))
+ if (bcmp(data, tc_log_magic, sizeof(tc_log_magic)))
{
sql_print_error("Bad magic header in tc log");
goto err1;
@@ -6451,7 +7294,8 @@ int TC_LOG_BINLOG::open(const char *opt_name)
goto err;
}
- if ((ev= Log_event::read_log_event(&log, 0, &fdle)) &&
+ if ((ev= Log_event::read_log_event(&log, 0, &fdle,
+ opt_master_verify_checksum)) &&
ev->get_type_code() == FORMAT_DESCRIPTION_EVENT &&
ev->flags & LOG_EVENT_BINLOG_IN_USE_F)
{
@@ -6481,42 +7325,86 @@ void TC_LOG_BINLOG::close()
mysql_cond_destroy(&COND_prep_xids);
}
-/**
- @todo
- group commit
-
- @retval
- 0 error
- @retval
- 1 success
+/*
+ Do a binlog log_xid() for a group of transactions, linked through
+ thd->next_commit_ordered.
*/
-int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid)
+int
+TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all,
+ bool need_prepare_ordered __attribute__((unused)),
+ bool need_commit_ordered __attribute__((unused)))
{
- DBUG_ENTER("TC_LOG_BINLOG::log");
+ int err;
+ DBUG_ENTER("TC_LOG_BINLOG::log_and_order");
+
binlog_cache_mngr *cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
- /*
- We always commit the entire transaction when writing an XID. Also
- note that the return value is inverted.
- */
- DBUG_RETURN(!binlog_commit_flush_stmt_cache(thd, cache_mngr) &&
- !binlog_commit_flush_trx_cache(thd, cache_mngr, xid));
+
+ cache_mngr->using_xa= TRUE;
+ cache_mngr->xa_xid= xid;
+ err= binlog_commit_flush_xid_caches(thd, cache_mngr, all, xid);
+
+ DEBUG_SYNC(thd, "binlog_after_log_and_order");
+
+ DBUG_RETURN(!err);
}
-int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
+/*
+ After an XID is logged, we need to hold on to the current binlog file until
+ it is fully committed in the storage engine. The reason is that crash
+ recovery only looks at the latest binlog, so we must make sure there are no
+ outstanding prepared (but not committed) transactions before rotating the
+ binlog.
+
+ To handle this, we keep a count of outstanding XIDs. This function is used
+ to increase this count when committing one or more transactions to the
+ binary log.
+*/
+void
+TC_LOG_BINLOG::mark_xids_active(uint xid_count)
{
- DBUG_ENTER("TC_LOG_BINLOG::unlog");
+ DBUG_ENTER("TC_LOG_BINLOG::mark_xids_active");
+ DBUG_PRINT("info", ("xid_count=%u", xid_count));
+ mysql_mutex_lock(&LOCK_prep_xids);
+ prepared_xids+= xid_count;
+ mysql_mutex_unlock(&LOCK_prep_xids);
+ DBUG_VOID_RETURN;
+}
+
+/*
+ Once an XID is committed, it is safe to rotate the binary log, as it can no
+ longer be needed during crash recovery.
+
+ This function is called to mark an XID this way. It needs to decrease the
+ count of pending XIDs, and signal the log rotator thread when it reaches zero.
+*/
+void
+TC_LOG_BINLOG::mark_xid_done()
+{
+ my_bool send_signal;
+
+ DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done");
mysql_mutex_lock(&LOCK_prep_xids);
// prepared_xids can be 0 if the transaction had ignorable errors.
DBUG_ASSERT(prepared_xids >= 0);
if (prepared_xids > 0)
prepared_xids--;
- if (prepared_xids == 0) {
+ send_signal= (prepared_xids == 0);
+ mysql_mutex_unlock(&LOCK_prep_xids);
+ if (send_signal) {
DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
mysql_cond_signal(&COND_prep_xids);
}
- mysql_mutex_unlock(&LOCK_prep_xids);
- DBUG_RETURN(rotate_and_purge(0)); // as ::write() did not rotate
+ DBUG_VOID_RETURN;
+}
+
+int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
+{
+ DBUG_ENTER("TC_LOG_BINLOG::unlog");
+ if (xid)
+ mark_xid_done();
+ /* As ::write_transaction_to_binlog() did not rotate, do it here. */
+ DBUG_RETURN(rotate_and_purge(0));
}
int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
@@ -6534,7 +7422,9 @@ int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
fdle->flags&= ~LOG_EVENT_BINLOG_IN_USE_F; // abort on the first error
- while ((ev= Log_event::read_log_event(log,0,fdle)) && ev->is_valid())
+ while ((ev= Log_event::read_log_event(log, 0, fdle,
+ opt_master_verify_checksum))
+ && ev->is_valid())
{
if (ev->get_type_code() == XID_EVENT)
{
@@ -6585,13 +7475,174 @@ ulonglong mysql_bin_log_file_pos(void)
{
return (ulonglong) mysql_bin_log.get_log_file()->pos_in_file;
}
+/*
+ Get the current position of the MySQL binlog for transaction currently being
+ committed.
+
+ This is valid to call from within storage engine commit_ordered() and
+ commit() methods only.
+
+ Since it stores the position inside THD, it is safe to call without any
+ locking.
+*/
+void
+mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file)
+{
+ binlog_cache_mngr *cache_mngr;
+ if (opt_bin_log &&
+ (cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton)))
+ {
+ *out_file= cache_mngr->last_commit_pos_file;
+ *out_pos= (ulonglong)(cache_mngr->last_commit_pos_offset);
+ }
+ else
+ {
+ *out_file= NULL;
+ *out_pos= 0;
+ }
+}
#endif /* INNODB_COMPATIBILITY_HOOKS */
+static void
+binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var,
+ void *var_ptr, const void *save)
+{
+ ulong value= *((ulong *)save);
+
+ mysql_mutex_lock(mysql_bin_log.get_log_lock());
+ if(mysql_bin_log.is_open())
+ {
+ uint flags= RP_FORCE_ROTATE | RP_LOCK_LOG_IS_ALREADY_LOCKED |
+ (binlog_checksum_options != (uint) value?
+ RP_BINLOG_CHECKSUM_ALG_CHANGE : 0);
+ if (flags & RP_BINLOG_CHECKSUM_ALG_CHANGE)
+ mysql_bin_log.checksum_alg_reset= (uint8) value;
+ mysql_bin_log.rotate_and_purge(flags);
+ }
+ else
+ {
+ binlog_checksum_options= value;
+ }
+ DBUG_ASSERT((ulong) binlog_checksum_options == value);
+ DBUG_ASSERT(mysql_bin_log.checksum_alg_reset == BINLOG_CHECKSUM_ALG_UNDEF);
+ mysql_mutex_unlock(mysql_bin_log.get_log_lock());
+}
+
+
+static int show_binlog_vars(THD *thd, SHOW_VAR *var, char *buff)
+{
+ mysql_bin_log.set_status_variables(thd);
+ var->type= SHOW_ARRAY;
+ var->value= (char *)&binlog_status_vars_detail;
+ return 0;
+}
+
+static SHOW_VAR binlog_status_vars_top[]= {
+ {"binlog", (char *) &show_binlog_vars, SHOW_FUNC},
+ {NullS, NullS, SHOW_LONG}
+};
+
+static MYSQL_SYSVAR_BOOL(
+ optimize_thread_scheduling,
+ opt_optimize_thread_scheduling,
+ PLUGIN_VAR_READONLY,
+ "Run fast part of group commit in a single thread, to optimize kernel "
+ "thread scheduling. On by default. Disable to run each transaction in group "
+ "commit in its own thread, which can be slower at very high concurrency. "
+ "This option is mostly for testing one algorithm versus the other, and it "
+ "should not normally be necessary to change it.",
+ NULL,
+ NULL,
+ 1);
+
+static MYSQL_SYSVAR_ENUM(
+ checksum,
+ binlog_checksum_options,
+ PLUGIN_VAR_RQCMDARG,
+ "Type of BINLOG_CHECKSUM_ALG. Include checksum for "
+ "log events in the binary log. Possible values are NONE and CRC32; "
+ "default is NONE.",
+ NULL,
+ binlog_checksum_update,
+ BINLOG_CHECKSUM_ALG_OFF,
+ &binlog_checksum_typelib);
+
+#ifndef DBUG_OFF
+static MYSQL_SYSVAR_ULONG(
+ dbug_fsync_sleep,
+ opt_binlog_dbug_fsync_sleep,
+ PLUGIN_VAR_RQCMDARG,
+ "Extra sleep (in microseconds) to add to binlog fsync(), for debugging",
+ NULL,
+ NULL,
+ 0,
+ 0,
+ ULONG_MAX,
+ 0);
+#endif
+
+static struct st_mysql_sys_var *binlog_sys_vars[]=
+{
+ MYSQL_SYSVAR(optimize_thread_scheduling),
+ MYSQL_SYSVAR(checksum),
+#ifndef DBUG_OFF
+ MYSQL_SYSVAR(dbug_fsync_sleep),
+#endif
+ NULL
+};
+
+
+/*
+ Copy out the non-directory part of binlog position filename for the
+ `binlog_snapshot_file' status variable, same way as it is done for
+ SHOW MASTER STATUS.
+*/
+static void
+set_binlog_snapshot_file(const char *src)
+{
+ int dir_len = dirname_length(src);
+ strmake(binlog_snapshot_file, src + dir_len, sizeof(binlog_snapshot_file)-1);
+}
+
+/*
+ Copy out current values of status variables, for SHOW STATUS or
+ information_schema.global_status.
+
+ This is called only under LOCK_status, so we can fill in a static array.
+*/
+void
+TC_LOG_BINLOG::set_status_variables(THD *thd)
+{
+ binlog_cache_mngr *cache_mngr;
+
+ if (thd && opt_bin_log)
+ cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ else
+ cache_mngr= 0;
+
+ bool have_snapshot= (cache_mngr && cache_mngr->last_commit_pos_file[0] != 0);
+ mysql_mutex_lock(&LOCK_commit_ordered);
+ binlog_status_var_num_commits= this->num_commits;
+ binlog_status_var_num_group_commits= this->num_group_commits;
+ if (!have_snapshot)
+ {
+ set_binlog_snapshot_file(last_commit_pos_file);
+ binlog_snapshot_position= last_commit_pos_offset;
+ }
+ mysql_mutex_unlock(&LOCK_commit_ordered);
+
+ if (have_snapshot)
+ {
+ set_binlog_snapshot_file(cache_mngr->last_commit_pos_file);
+ binlog_snapshot_position= cache_mngr->last_commit_pos_offset;
+ }
+}
+
struct st_mysql_storage_engine binlog_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
-mysql_declare_plugin(binlog)
+maria_declare_plugin(binlog)
{
MYSQL_STORAGE_ENGINE_PLUGIN,
&binlog_storage_engine,
@@ -6602,9 +7653,9 @@ mysql_declare_plugin(binlog)
binlog_init, /* Plugin Init */
NULL, /* Plugin Deinit */
0x0100 /* 1.0 */,
- NULL, /* status variables */
- NULL, /* system variables */
- NULL, /* config options */
- 0, /* flags */
+ binlog_status_vars_top, /* status variables */
+ binlog_sys_vars, /* system variables */
+ "1.0", /* string version */
+ MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
}
-mysql_declare_plugin_end;
+maria_declare_plugin_end;