summaryrefslogtreecommitdiff
path: root/sql/log.cc
diff options
context:
space:
mode:
authorSergei Golubchik <sergii@pisem.net>2011-10-19 21:45:18 +0200
committerSergei Golubchik <sergii@pisem.net>2011-10-19 21:45:18 +0200
commit76f0b94bb0b2994d639353530c5b251d0f1a204b (patch)
tree9ed50628aac34f89a37637bab2fc4915b86b5eb4 /sql/log.cc
parent4e46d8e5bff140f2549841167dc4b65a3c0a645d (diff)
parent5dc1a2231f55bacc9aaf0e24816f3d9c2ee1f21d (diff)
downloadmariadb-git-76f0b94bb0b2994d639353530c5b251d0f1a204b.tar.gz
merge with 5.3
sql/sql_insert.cc: CREATE ... IF NOT EXISTS may do nothing, but it is still not a failure. don't forget to my_ok it. ****** CREATE ... IF NOT EXISTS may do nothing, but it is still not a failure. don't forget to my_ok it. sql/sql_table.cc: small cleanup ****** small cleanup
Diffstat (limited to 'sql/log.cc')
-rw-r--r--sql/log.cc1595
1 files changed, 1284 insertions, 311 deletions
diff --git a/sql/log.cc b/sql/log.cc
index e66ca32d560..2b248fa80ba 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -1,4 +1,5 @@
-/* Copyright (c) 2000, 2011, Oracle and/or its affiliates. All rights reserved.
+/* Copyright (c) 2000, 2011, Oracle and/or its affiliates.
+ Copyright (c) 2010-2011 Monty Program Ab
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -50,6 +51,7 @@
#include "sql_plugin.h"
#include "rpl_handler.h"
+#include "debug_sync.h"
/* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024
@@ -71,6 +73,38 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv);
static int binlog_commit(handlerton *hton, THD *thd, bool all);
static int binlog_rollback(handlerton *hton, THD *thd, bool all);
static int binlog_prepare(handlerton *hton, THD *thd, bool all);
+static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd);
+
+static LEX_STRING const write_error_msg=
+ { C_STRING_WITH_LEN("error writing to the binary log") };
+
+static my_bool opt_optimize_thread_scheduling= TRUE;
+ulong binlog_checksum_options;
+#ifndef DBUG_OFF
+static ulong opt_binlog_dbug_fsync_sleep= 0;
+#endif
+
+mysql_mutex_t LOCK_prepare_ordered;
+mysql_mutex_t LOCK_commit_ordered;
+
+static ulonglong binlog_status_var_num_commits;
+static ulonglong binlog_status_var_num_group_commits;
+static char binlog_snapshot_file[FN_REFLEN];
+static ulonglong binlog_snapshot_position;
+
+static SHOW_VAR binlog_status_vars_detail[]=
+{
+ {"commits",
+ (char *)&binlog_status_var_num_commits, SHOW_LONGLONG},
+ {"group_commits",
+ (char *)&binlog_status_var_num_group_commits, SHOW_LONGLONG},
+ {"snapshot_file",
+ (char *)&binlog_snapshot_file, SHOW_CHAR},
+ {"snapshot_position",
+ (char *)&binlog_snapshot_position, SHOW_LONGLONG},
+ {NullS, NullS, SHOW_LONG}
+};
+
/**
purge logs, master and slave sides both, related error code
@@ -148,59 +182,27 @@ sql_print_message_func sql_print_message_handlers[3] =
sql_print_error
};
-/**
- Create the name of the log specified.
- This method forms a new path + file name for the
- log specified in @c name.
-
- @param[IN] buff Location for building new string.
- @param[IN] name Name of the log file.
- @param[IN] log_ext The extension for the log (e.g. .log).
-
- @returns Pointer to new string containing the name.
+/**
+ Create the name of the log file
+
+ @param[OUT] out a pointer to a new allocated name will go there
+ @param[IN] log_ext The extension for the file (e.g .log)
+ @param[IN] once whether to use malloc_once or a normal malloc.
*/
-char *make_log_name(char *buff, const char *name, const char* log_ext)
+void make_default_log_name(char **out, const char* log_ext, bool once)
{
- strmake(buff, name, FN_REFLEN-5);
- return fn_format(buff, buff, mysql_real_data_home, log_ext,
- MYF(MY_UNPACK_FILENAME|MY_REPLACE_EXT));
-}
-
-/*
- Helper class to hold a mutex for the duration of the
- block.
-
- Eliminates the need for explicit unlocking of mutexes on, e.g.,
- error returns. On passing a null pointer, the sentry will not do
- anything.
- */
-class Mutex_sentry
-{
-public:
- Mutex_sentry(mysql_mutex_t *mutex)
- : m_mutex(mutex)
- {
- if (m_mutex)
- mysql_mutex_lock(mutex);
- }
-
- ~Mutex_sentry()
+ char buff[FN_REFLEN+10];
+ fn_format(buff, opt_log_basename, "", log_ext, MYF(MY_REPLACE_EXT));
+ if (once)
+ *out= my_once_strdup(buff, MYF(MY_WME));
+ else
{
- if (m_mutex)
- mysql_mutex_unlock(m_mutex);
-#ifndef DBUG_OFF
- m_mutex= 0;
-#endif
+ my_free(*out);
+ *out= my_strdup(buff, MYF(MY_WME));
}
+}
-private:
- mysql_mutex_t *m_mutex;
-
- // It's not allowed to copy this object in any way
- Mutex_sentry(Mutex_sentry const&);
- void operator=(Mutex_sentry const&);
-};
/*
Helper classes to store non-transactional and transactional data
@@ -422,6 +424,7 @@ public:
ulong *param_ptr_binlog_stmt_cache_disk_use,
ulong *param_ptr_binlog_cache_use,
ulong *param_ptr_binlog_cache_disk_use)
+ : last_commit_pos_offset(0), using_xa(FALSE), xa_xid(0)
{
stmt_cache.set_binlog_cache_info(param_max_binlog_stmt_cache_size,
param_ptr_binlog_stmt_cache_use,
@@ -429,11 +432,20 @@ public:
trx_cache.set_binlog_cache_info(param_max_binlog_cache_size,
param_ptr_binlog_cache_use,
param_ptr_binlog_cache_disk_use);
+ last_commit_pos_file[0]= 0;
}
- void reset_cache(binlog_cache_data* cache_data)
+ void reset(bool do_stmt, bool do_trx)
{
- cache_data->reset();
+ if (do_stmt)
+ stmt_cache.reset();
+ if (do_trx)
+ {
+ trx_cache.reset();
+ using_xa= FALSE;
+ last_commit_pos_file[0]= 0;
+ last_commit_pos_offset= 0;
+ }
}
binlog_cache_data* get_binlog_cache_data(bool is_transactional)
@@ -450,6 +462,23 @@ public:
binlog_cache_data trx_cache;
+ /*
+ Binlog position for current transaction.
+ For START TRANSACTION WITH CONSISTENT SNAPSHOT, this is the binlog
+ position corresponding to the snapshot taken. During (and after) commit,
+ this is set to the binlog position corresponding to just after the
+ commit (so storage engines can store it in their transaction log).
+ */
+ char last_commit_pos_file[FN_REFLEN];
+ my_off_t last_commit_pos_offset;
+
+ /*
+ Flag set true if this transaction is committed with log_xid() as part of
+ XA, false if not.
+ */
+ bool using_xa;
+ my_xid xa_xid;
+
private:
binlog_cache_mngr& operator=(const binlog_cache_mngr& info);
@@ -552,7 +581,7 @@ void Log_to_csv_event_handler::cleanup()
*/
bool Log_to_csv_event_handler::
- log_general(THD *thd, time_t event_time, const char *user_host,
+ log_general(THD *thd, my_hrtime_t event_time, const char *user_host,
uint user_host_len, int thread_id,
const char *command_type, uint command_type_len,
const char *sql_text, uint sql_text_len,
@@ -629,8 +658,8 @@ bool Log_to_csv_event_handler::
DBUG_ASSERT(table->field[0]->type() == MYSQL_TYPE_TIMESTAMP);
- ((Field_timestamp*) table->field[0])->store_timestamp((my_time_t)
- event_time);
+ ((Field_timestamp*) table->field[0])->store_TIME(
+ hrtime_to_my_time(event_time), hrtime_sec_part(event_time));
/* do a write */
if (table->field[1]->store(user_host, user_host_len, client_cs) ||
@@ -694,7 +723,6 @@ err:
log_slow()
thd THD of the query
current_time current timestamp
- query_start_arg command start timestamp
user_host the pointer to the string with user@host info
user_host_len length of the user_host string. this is computed once
and passed to all general log event handlers
@@ -717,7 +745,7 @@ err:
*/
bool Log_to_csv_event_handler::
- log_slow(THD *thd, time_t current_time, time_t query_start_arg,
+ log_slow(THD *thd, my_hrtime_t current_time,
const char *user_host, uint user_host_len,
ulonglong query_utime, ulonglong lock_utime, bool is_command,
const char *sql_text, uint sql_text_len)
@@ -731,6 +759,11 @@ bool Log_to_csv_event_handler::
Open_tables_backup open_tables_backup;
CHARSET_INFO *client_cs= thd->variables.character_set_client;
bool save_time_zone_used;
+ long query_time= (long) min(query_utime/1000000, TIME_MAX_VALUE_SECONDS);
+ long lock_time= (long) min(lock_utime/1000000, TIME_MAX_VALUE_SECONDS);
+ long query_time_micro= (long) (query_utime % 1000000);
+ long lock_time_micro= (long) (lock_utime % 1000000);
+
DBUG_ENTER("Log_to_csv_event_handler::log_slow");
thd->push_internal_handler(& error_handler);
@@ -767,45 +800,34 @@ bool Log_to_csv_event_handler::
/* store the time and user values */
DBUG_ASSERT(table->field[0]->type() == MYSQL_TYPE_TIMESTAMP);
- ((Field_timestamp*) table->field[0])->store_timestamp((my_time_t)
- current_time);
+ ((Field_timestamp*) table->field[0])->store_TIME(
+ hrtime_to_my_time(current_time), hrtime_sec_part(current_time));
if (table->field[1]->store(user_host, user_host_len, client_cs))
goto err;
- if (query_start_arg)
- {
- longlong query_time= (longlong) (query_utime/1000000);
- longlong lock_time= (longlong) (lock_utime/1000000);
- /*
- A TIME field can not hold the full longlong range; query_time or
- lock_time may be truncated without warning here, if greater than
- 839 hours (~35 days)
- */
- MYSQL_TIME t;
- t.neg= 0;
+ /*
+ A TIME field can not hold the full longlong range; query_time or
+ lock_time may be truncated without warning here, if greater than
+ 839 hours (~35 days)
+ */
+ MYSQL_TIME t;
+ t.neg= 0;
+
+ /* fill in query_time field */
+ calc_time_from_sec(&t, query_time, query_time_micro);
+ if (table->field[2]->store_time(&t))
+ goto err;
+ /* lock_time */
+ calc_time_from_sec(&t, lock_time, lock_time_micro);
+ if (table->field[3]->store_time(&t))
+ goto err;
+ /* rows_sent */
+ if (table->field[4]->store((longlong) thd->sent_row_count, TRUE))
+ goto err;
+ /* rows_examined */
+ if (table->field[5]->store((longlong) thd->examined_row_count, TRUE))
+ goto err;
- /* fill in query_time field */
- calc_time_from_sec(&t, (long) min(query_time, (longlong) TIME_MAX_VALUE_SECONDS), 0);
- if (table->field[2]->store_time(&t, MYSQL_TIMESTAMP_TIME))
- goto err;
- /* lock_time */
- calc_time_from_sec(&t, (long) min(lock_time, (longlong) TIME_MAX_VALUE_SECONDS), 0);
- if (table->field[3]->store_time(&t, MYSQL_TIMESTAMP_TIME))
- goto err;
- /* rows_sent */
- if (table->field[4]->store((longlong) thd->sent_row_count, TRUE))
- goto err;
- /* rows_examined */
- if (table->field[5]->store((longlong) thd->examined_row_count, TRUE))
- goto err;
- }
- else
- {
- table->field[2]->set_null();
- table->field[3]->set_null();
- table->field[4]->set_null();
- table->field[5]->set_null();
- }
/* fill database field */
if (thd->db)
{
@@ -937,14 +959,14 @@ void Log_to_file_event_handler::init_pthread_objects()
/** Wrapper around MYSQL_LOG::write() for slow log. */
bool Log_to_file_event_handler::
- log_slow(THD *thd, time_t current_time, time_t query_start_arg,
+ log_slow(THD *thd, my_hrtime_t current_time,
const char *user_host, uint user_host_len,
ulonglong query_utime, ulonglong lock_utime, bool is_command,
const char *sql_text, uint sql_text_len)
{
Silence_log_table_errors error_handler;
thd->push_internal_handler(&error_handler);
- bool retval= mysql_slow_log.write(thd, current_time, query_start_arg,
+ bool retval= mysql_slow_log.write(thd, hrtime_to_my_time(current_time),
user_host, user_host_len,
query_utime, lock_utime, is_command,
sql_text, sql_text_len);
@@ -959,7 +981,7 @@ bool Log_to_file_event_handler::
*/
bool Log_to_file_event_handler::
- log_general(THD *thd, time_t event_time, const char *user_host,
+ log_general(THD *thd, my_hrtime_t event_time, const char *user_host,
uint user_host_len, int thread_id,
const char *command_type, uint command_type_len,
const char *sql_text, uint sql_text_len,
@@ -967,7 +989,8 @@ bool Log_to_file_event_handler::
{
Silence_log_table_errors error_handler;
thd->push_internal_handler(&error_handler);
- bool retval= mysql_log.write(event_time, user_host, user_host_len,
+ bool retval= mysql_log.write(hrtime_to_time(event_time), user_host,
+ user_host_len,
thread_id, command_type, command_type_len,
sql_text, sql_text_len);
thd->pop_internal_handler();
@@ -1200,8 +1223,6 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length,
if (*slow_log_handler_list)
{
- time_t current_time;
-
/* do not log slow queries from replication threads */
if (thd->slave_thread && !opt_log_slow_slave_statements)
return 0;
@@ -1221,16 +1242,12 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length,
sctx->ip ? sctx->ip : "", "]", NullS) -
user_host_buff);
- current_time= my_time_possible_from_micro(current_utime);
- if (thd->start_utime)
- {
- query_utime= (current_utime - thd->start_utime);
- lock_utime= (thd->utime_after_lock - thd->start_utime);
- }
- else
- {
- query_utime= lock_utime= 0;
- }
+ DBUG_ASSERT(thd->start_utime);
+ DBUG_ASSERT(thd->start_time);
+ query_utime= (current_utime - thd->start_utime);
+ lock_utime= (thd->utime_after_lock - thd->start_utime);
+ my_hrtime_t current_time= { hrtime_from_time(thd->start_time) +
+ thd->start_time_sec_part + query_utime };
if (!query)
{
@@ -1251,7 +1268,7 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length,
}
for (current_handler= slow_log_handler_list; *current_handler ;)
- error= (*current_handler++)->log_slow(thd, current_time, thd->start_time,
+ error= (*current_handler++)->log_slow(thd, current_time,
user_host_buff, user_host_len,
query_utime, lock_utime, is_command,
query, query_length) || error;
@@ -1268,7 +1285,7 @@ bool LOGGER::general_log_write(THD *thd, enum enum_server_command command,
Log_event_handler **current_handler= general_log_handler_list;
char user_host_buff[MAX_USER_HOST_SIZE + 1];
uint user_host_len= 0;
- time_t current_time;
+ my_hrtime_t current_time;
DBUG_ASSERT(thd);
@@ -1280,9 +1297,9 @@ bool LOGGER::general_log_write(THD *thd, enum enum_server_command command,
}
user_host_len= make_user_name(thd, user_host_buff);
- current_time= my_time(0);
+ current_time= my_hrtime();
- mysql_audit_general_log(thd, current_time,
+ mysql_audit_general_log(thd, hrtime_to_time(current_time),
user_host_buff, user_host_len,
command_name[(uint) command].str,
command_name[(uint) command].length,
@@ -1587,6 +1604,7 @@ int binlog_init(void *p)
binlog_hton->commit= binlog_commit;
binlog_hton->rollback= binlog_rollback;
binlog_hton->prepare= binlog_prepare;
+ binlog_hton->start_consistent_snapshot= binlog_start_consistent_snapshot;
binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN;
return 0;
}
@@ -1602,48 +1620,69 @@ static int binlog_close_connection(handlerton *hton, THD *thd)
return 0;
}
-/**
+/*
This function flushes a cache upon commit/rollback.
- @param thd The thread whose transaction should be flushed
- @param cache_data Pointer to the cache
- @param end_ev The end event either commit/rollback
- @param is_transactional The type of the cache: transactional or
- non-transactional
+ SYNOPSIS
+ binlog_flush_cache()
- @return
- nonzero if an error pops up when flushing the cache.
-*/
-static inline int
-binlog_flush_cache(THD *thd, binlog_cache_data* cache_data, Log_event *end_evt,
- bool is_transactional)
+ thd The thread whose transaction should be ended
+ cache_mngr Pointer to the binlog_cache_mngr to use
+ all True if the entire transaction should be ended, false if
+ only the statement transaction should be ended.
+ end_ev The end event to use (COMMIT, ROLLBACK, or commit XID)
+ using_stmt True if the statement cache should be flushed
+ using_trx True if the transaction cache should be flushed
+
+ DESCRIPTION
+
+ End the currently transaction or statement. The transaction can be either
+ a real transaction or a statement transaction.
+
+ This can be to commit a transaction, with a COMMIT query event or an XA
+ commit XID event. But it can also be to rollback a transaction with a
+ ROLLBACK query event, used for rolling back transactions which also
+ contain updates to non-transactional tables. Or it can be a flush of
+ a statement cache.
+ */
+static int
+binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr,
+ Log_event *end_ev, bool all, bool using_stmt,
+ bool using_trx)
{
- DBUG_ENTER("binlog_flush_cache");
int error= 0;
+ DBUG_ENTER("binlog_flush_cache");
- if (!cache_data->empty())
+ if ((using_stmt && !cache_mngr->stmt_cache.empty()) ||
+ (using_trx && !cache_mngr->trx_cache.empty()))
{
- if (thd->binlog_flush_pending_rows_event(TRUE, is_transactional))
+ if (using_stmt && thd->binlog_flush_pending_rows_event(TRUE, FALSE))
DBUG_RETURN(1);
+ if (using_trx && thd->binlog_flush_pending_rows_event(TRUE, TRUE))
+ DBUG_RETURN(1);
+
/*
Doing a commit or a rollback including non-transactional tables,
i.e., ending a transaction where we might write the transaction
cache to the binary log.
We can always end the statement when ending a transaction since
- transactions are not allowed inside stored functions. If they
+ transactions are not allowed inside stored functions. If they
were, we would have to ensure that we're not ending a statement
inside a stored function.
*/
- error= mysql_bin_log.write(thd, &cache_data->cache_log, end_evt,
- cache_data->has_incident());
+ error= mysql_bin_log.write_transaction_to_binlog(thd, cache_mngr,
+ end_ev, all,
+ using_stmt, using_trx);
}
- cache_data->reset();
+ cache_mngr->reset(using_stmt, using_trx);
- DBUG_ASSERT(cache_data->empty());
+ DBUG_ASSERT((!using_stmt || cache_mngr->stmt_cache.empty()) &&
+ (!using_trx || cache_mngr->trx_cache.empty()));
DBUG_RETURN(error);
}
+
/**
This function flushes the stmt-cache upon commit.
@@ -1654,13 +1693,12 @@ binlog_flush_cache(THD *thd, binlog_cache_data* cache_data, Log_event *end_evt,
nonzero if an error pops up when flushing the cache.
*/
static inline int
-binlog_commit_flush_stmt_cache(THD *thd,
+binlog_commit_flush_stmt_cache(THD *thd, bool all,
binlog_cache_mngr *cache_mngr)
{
Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
- FALSE, FALSE, TRUE, 0);
- return (binlog_flush_cache(thd, &cache_mngr->stmt_cache, &end_evt,
- FALSE));
+ FALSE, TRUE, TRUE, 0);
+ return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE));
}
/**
@@ -1673,12 +1711,11 @@ binlog_commit_flush_stmt_cache(THD *thd,
nonzero if an error pops up when flushing the cache.
*/
static inline int
-binlog_commit_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr)
+binlog_commit_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr)
{
Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
- TRUE, FALSE, TRUE, 0);
- return (binlog_flush_cache(thd, &cache_mngr->trx_cache, &end_evt,
- TRUE));
+ TRUE, TRUE, TRUE, 0);
+ return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE));
}
/**
@@ -1691,12 +1728,12 @@ binlog_commit_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr)
nonzero if an error pops up when flushing the cache.
*/
static inline int
-binlog_rollback_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr)
+binlog_rollback_flush_trx_cache(THD *thd, bool all,
+ binlog_cache_mngr *cache_mngr)
{
Query_log_event end_evt(thd, STRING_WITH_LEN("ROLLBACK"),
- TRUE, FALSE, TRUE, 0);
- return (binlog_flush_cache(thd, &cache_mngr->trx_cache, &end_evt,
- TRUE));
+ TRUE, TRUE, TRUE, 0);
+ return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE));
}
/**
@@ -1710,12 +1747,26 @@ binlog_rollback_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr)
nonzero if an error pops up when flushing the cache.
*/
static inline int
-binlog_commit_flush_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr,
- my_xid xid)
+binlog_commit_flush_xid_caches(THD *thd, binlog_cache_mngr *cache_mngr,
+ bool all, my_xid xid)
{
- Xid_log_event end_evt(thd, xid);
- return (binlog_flush_cache(thd, &cache_mngr->trx_cache, &end_evt,
- TRUE));
+ if (xid)
+ {
+ Xid_log_event end_evt(thd, xid, TRUE);
+ return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE));
+ }
+ else
+ {
+ /*
+ Empty xid occurs in XA COMMIT ... ONE PHASE.
+ In this case, we do not have a MySQL xid for the transaction, and the
+ external XA transaction coordinator will have to handle recovery if
+ needed. So we end the transaction with a plain COMMIT query event.
+ */
+ Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
+ TRUE, TRUE, TRUE, 0);
+ return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE));
+ }
}
/**
@@ -1754,11 +1805,11 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all)
if (ending_trans(thd, all))
{
if (cache_mngr->trx_cache.has_incident())
- error= mysql_bin_log.write_incident(thd, TRUE);
+ error= mysql_bin_log.write_incident(thd);
thd->clear_binlog_table_maps();
- cache_mngr->reset_cache(&cache_mngr->trx_cache);
+ cache_mngr->reset(false, true);
}
/*
If rolling back a statement in a transaction, we truncate the
@@ -1777,7 +1828,7 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all)
do nothing.
just pretend we can do 2pc, so that MySQL won't
switch to 1pc.
- real work will be done in MYSQL_BIN_LOG::log_xid()
+ real work will be done in MYSQL_BIN_LOG::log_and_order()
*/
return 0;
}
@@ -1810,7 +1861,7 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
if (!cache_mngr->stmt_cache.empty())
{
- error= binlog_commit_flush_stmt_cache(thd, cache_mngr);
+ error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr);
}
if (cache_mngr->trx_cache.empty())
@@ -1818,7 +1869,7 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
/*
we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid()
*/
- cache_mngr->reset_cache(&cache_mngr->trx_cache);
+ cache_mngr->reset(false, true);
DBUG_RETURN(error);
}
@@ -1829,7 +1880,7 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
Otherwise, we accumulate the changes.
*/
if (!error && ending_trans(thd, all))
- error= binlog_commit_flush_trx_cache(thd, cache_mngr);
+ error= binlog_commit_flush_trx_cache(thd, all, cache_mngr);
/*
This is part of the stmt rollback.
@@ -1868,12 +1919,12 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
*/
if (cache_mngr->stmt_cache.has_incident())
{
- error= mysql_bin_log.write_incident(thd, TRUE);
- cache_mngr->reset_cache(&cache_mngr->stmt_cache);
+ error= mysql_bin_log.write_incident(thd);
+ cache_mngr->reset(true, false);
}
else if (!cache_mngr->stmt_cache.empty())
{
- error= binlog_commit_flush_stmt_cache(thd, cache_mngr);
+ error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr);
}
if (cache_mngr->trx_cache.empty())
@@ -1881,7 +1932,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
/*
we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid()
*/
- cache_mngr->reset_cache(&cache_mngr->trx_cache);
+ cache_mngr->reset(false, true);
DBUG_RETURN(error);
}
@@ -1921,7 +1972,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
(trans_has_updated_non_trans_table(thd) &&
ending_single_stmt_trans(thd,all) &&
thd->variables.binlog_format == BINLOG_FORMAT_MIXED)))
- error= binlog_rollback_flush_trx_cache(thd, cache_mngr);
+ error= binlog_rollback_flush_trx_cache(thd, all, cache_mngr);
/*
Truncate the cache if:
. aborting a single or multi-statement transaction or;
@@ -2038,7 +2089,7 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
log_query.append("`"))
DBUG_RETURN(1);
int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
- Query_log_event qinfo(thd, log_query.c_ptr_safe(), log_query.length(),
+ Query_log_event qinfo(thd, log_query.ptr(), log_query.length(),
TRUE, FALSE, TRUE, errcode);
DBUG_RETURN(mysql_bin_log.write(&qinfo));
}
@@ -2062,7 +2113,7 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
log_query.append("`"))
DBUG_RETURN(1);
int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
- Query_log_event qinfo(thd, log_query.c_ptr_safe(), log_query.length(),
+ Query_log_event qinfo(thd, log_query.ptr(), log_query.length(),
TRUE, FALSE, TRUE, errcode);
DBUG_RETURN(mysql_bin_log.write(&qinfo));
}
@@ -2190,6 +2241,7 @@ static int find_uniq_filename(char *name)
char *start, *end;
int error= 0;
DBUG_ENTER("find_uniq_filename");
+ LINT_INIT(number);
length= dirname_part(buff, name, &buf_length);
start= name + length;
@@ -2648,7 +2700,6 @@ err:
thd THD of the query
current_time current timestamp
- query_start_arg command start timestamp
user_host the pointer to the string with user@host info
user_host_len length of the user_host string. this is computed once
and passed to all general log event handlers
@@ -2670,7 +2721,7 @@ err:
*/
bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
- time_t query_start_arg, const char *user_host,
+ const char *user_host,
uint user_host_len, ulonglong query_utime,
ulonglong lock_utime, bool is_command,
const char *sql_text, uint sql_text_len)
@@ -2847,8 +2898,12 @@ const char *MYSQL_LOG::generate_name(const char *log_name,
MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period)
:bytes_written(0), prepared_xids(0), file_id(1), open_count(1),
need_start_event(TRUE),
+ group_commit_queue(0), group_commit_queue_busy(FALSE),
+ num_commits(0), num_group_commits(0),
sync_period_ptr(sync_period),
is_relay_log(0), signal_cnt(0),
+ checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF),
+ relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
description_event_for_exec(0), description_event_for_queue(0)
{
/*
@@ -2896,7 +2951,9 @@ void MYSQL_BIN_LOG::init_pthread_objects()
{
MYSQL_LOG::init_pthread_objects();
mysql_mutex_init(m_key_LOCK_index, &LOCK_index, MY_MUTEX_INIT_SLOW);
+ mysql_mutex_setflags(&LOCK_index, MYF_NO_DEADLOCK_DETECTION);
mysql_cond_init(m_key_update_cond, &update_cond, 0);
+ mysql_cond_init(m_key_COND_queue_busy, &COND_queue_busy, 0);
}
@@ -3081,7 +3138,19 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
as we won't be able to reset it later
*/
if (io_cache_type == WRITE_CACHE)
- s.flags|= LOG_EVENT_BINLOG_IN_USE_F;
+ s.flags |= LOG_EVENT_BINLOG_IN_USE_F;
+ s.checksum_alg= is_relay_log ?
+ /* relay-log */
+ /* inherit master's A descriptor if one has been received */
+ (relay_log_checksum_alg=
+ (relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
+ relay_log_checksum_alg :
+ /* otherwise use slave's local preference of RL events verification */
+ (opt_slave_sql_verify_checksum == 0) ?
+ (uint8) BINLOG_CHECKSUM_ALG_OFF : (uint8) binlog_checksum_options):
+ /* binlog */
+ (uint8) binlog_checksum_options;
+ DBUG_ASSERT(s.checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
if (!s.is_valid())
goto err;
s.dont_set_created= null_created_arg;
@@ -3123,6 +3192,11 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
if (flush_io_cache(&log_file) ||
mysql_file_sync(log_file.file, MYF(MY_WME)))
goto err;
+ mysql_mutex_lock(&LOCK_commit_ordered);
+ strmake(last_commit_pos_file, log_file_name,
+ sizeof(last_commit_pos_file)-1);
+ last_commit_pos_offset= my_b_tell(&log_file);
+ mysql_mutex_unlock(&LOCK_commit_ordered);
if (write_file_name_to_index_file)
{
@@ -3403,6 +3477,13 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
ha_reset_logs(thd);
/*
+ We need to get both locks to be sure that no one is trying to
+ write to the index log file.
+ */
+ mysql_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_index);
+
+ /*
The following mutex is needed to ensure that no threads call
'delete thd' as we would then risk missing a 'rollback' from this
thread. If the transaction involved MyISAM tables, it should go
@@ -3410,13 +3491,6 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
*/
mysql_mutex_lock(&LOCK_thread_count);
- /*
- We need to get both locks to be sure that no one is trying to
- write to the index log file.
- */
- mysql_mutex_lock(&LOCK_log);
- mysql_mutex_lock(&LOCK_index);
-
/* Save variables so that we can reopen the log */
save_name=name;
name=0; // Protect against free
@@ -4257,8 +4331,15 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
We log the whole file name for log file as the user may decide
to change base names at some point.
*/
- Rotate_log_event r(new_name+dirname_length(new_name),
- 0, LOG_EVENT_OFFSET, is_relay_log ? Rotate_log_event::RELAY_LOG : 0);
+ Rotate_log_event r(new_name+dirname_length(new_name), 0, LOG_EVENT_OFFSET,
+ is_relay_log ? Rotate_log_event::RELAY_LOG : 0);
+ /*
+ The current relay-log's closing Rotate event must have checksum
+ value computed with an algorithm of the last relay-logged FD event.
+ */
+ if (is_relay_log)
+ r.checksum_alg= relay_log_checksum_alg;
+ DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
if(DBUG_EVALUATE_IF("fault_injection_new_file_rotate_event", (error=close_on_error=TRUE), FALSE) ||
(error= r.write(&log_file)))
{
@@ -4279,7 +4360,12 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
old_name=name;
name=0; // Don't free name
close(LOG_CLOSE_TO_BE_OPENED | LOG_CLOSE_INDEX);
-
+ if (log_type == LOG_BIN && checksum_alg_reset != BINLOG_CHECKSUM_ALG_UNDEF)
+ {
+ DBUG_ASSERT(!is_relay_log);
+ DBUG_ASSERT(binlog_checksum_options != checksum_alg_reset);
+ binlog_checksum_options= checksum_alg_reset;
+ }
/*
Note that at this point, log_state != LOG_CLOSED (important for is_open()).
*/
@@ -4421,6 +4507,10 @@ bool MYSQL_BIN_LOG::flush_and_sync(bool *synced)
err= mysql_file_sync(fd, MYF(MY_WME));
if (synced)
*synced= 1;
+#ifndef DBUG_OFF
+ if (opt_binlog_dbug_fsync_sleep > 0)
+ my_sleep(opt_binlog_dbug_fsync_sleep);
+#endif
}
return err;
}
@@ -4681,12 +4771,34 @@ void THD::binlog_set_stmt_begin() {
cache_mngr->trx_cache.set_prev_position(pos);
}
+static int
+binlog_start_consistent_snapshot(handlerton *hton, THD *thd)
+{
+ int err= 0;
+ DBUG_ENTER("binlog_start_consistent_snapshot");
+
+ thd->binlog_setup_trx_data();
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+
+ /* Server layer calls us with LOCK_commit_ordered locked, so this is safe. */
+ strmake(cache_mngr->last_commit_pos_file, mysql_bin_log.last_commit_pos_file,
+ sizeof(cache_mngr->last_commit_pos_file)-1);
+ cache_mngr->last_commit_pos_offset= mysql_bin_log.last_commit_pos_offset;
+
+ trans_register_ha(thd, TRUE, hton);
+
+ DBUG_RETURN(err);
+}
/**
This function writes a table map to the binary log.
Note that in order to keep the signature uniform with related methods,
we use a redundant parameter to indicate whether a transactional table
was changed or not.
+
+ If with_annotate != NULL and
+ *with_annotate = TRUE write also Annotate_rows before the table map.
@param table a pointer to the table.
@param is_transactional @c true indicates a transactional table,
@@ -4694,7 +4806,8 @@ void THD::binlog_set_stmt_begin() {
@return
nonzero if an error pops up when writing the table map event.
*/
-int THD::binlog_write_table_map(TABLE *table, bool is_transactional)
+int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
+ my_bool *with_annotate)
{
int error;
DBUG_ENTER("THD::binlog_write_table_map");
@@ -4717,6 +4830,14 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional)
IO_CACHE *file=
cache_mngr->get_binlog_cache_log(use_trans_cache(this, is_transactional));
+ if (with_annotate && *with_annotate)
+ {
+ Annotate_rows_log_event anno(current_thd, is_transactional);
+ /* Annotate event should be written not more than once */
+ *with_annotate= 0;
+ if ((error= anno.write(file)))
+ DBUG_RETURN(error);
+ }
if ((error= the_event.write(file)))
DBUG_RETURN(error);
@@ -4871,10 +4992,12 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
}
/**
- Write an event to the binary log.
+ Write an event to the binary log. If with_annotate != NULL and
+ *with_annotate = TRUE write also Annotate_rows before the event
+ (this should happen only if the event is a Table_map).
*/
-bool MYSQL_BIN_LOG::write(Log_event *event_info)
+bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
{
THD *thd= event_info->thd;
bool error= 1;
@@ -4962,9 +5085,22 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
of the SQL command. If row-based binlogging, Insert_id, Rand
and other kind of "setting context" events are not needed.
*/
+
+ if (with_annotate && *with_annotate)
+ {
+ DBUG_ASSERT(event_info->get_type_code() == TABLE_MAP_EVENT);
+ Annotate_rows_log_event anno(thd, event_info->cache_type);
+ /* Annotate event should be written not more than once */
+ *with_annotate= 0;
+ if (anno.write(file))
+ goto err;
+ }
+
+ if (thd)
{
if (!thd->is_current_stmt_binlog_format_row())
{
+
if (thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt)
{
Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT,
@@ -5026,6 +5162,8 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
err:
if (event_info->use_direct_logging())
{
+ my_off_t offset= my_b_tell(file);
+
if (!error)
{
bool synced;
@@ -5034,7 +5172,7 @@ err:
goto unlock;
status_var_add(thd->status_var.binlog_bytes_written,
- my_b_tell(file) - my_org_b_tell);
+ offset - my_org_b_tell);
if ((error= RUN_HOOK(binlog_storage, after_flush,
(thd, log_file_name, file->pos_in_file, synced))))
@@ -5045,7 +5183,15 @@ err:
signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
}
+
unlock:
+ /*
+ Take mutex to protect against a reader seeing partial writes of 64-bit
+ offset on 32-bit CPUs.
+ */
+ mysql_mutex_lock(&LOCK_commit_ordered);
+ last_commit_pos_offset= offset;
+ mysql_mutex_unlock(&LOCK_commit_ordered);
mysql_mutex_unlock(&LOCK_log);
}
@@ -5162,12 +5308,14 @@ int MYSQL_BIN_LOG::rotate_and_purge(uint flags)
We give it a shot and try to write an incident event anyway
to the current log.
*/
- if (!write_incident(current_thd, FALSE))
+ if (!write_incident_already_locked(current_thd))
flush_and_sync(0);
#ifdef HAVE_REPLICATION
check_purge= true;
#endif
+ if (flags & RP_BINLOG_CHECKSUM_ALG_CHANGE)
+ checksum_alg_reset= BINLOG_CHECKSUM_ALG_UNDEF; // done
}
if (!(flags & RP_LOCK_LOG_IS_ALREADY_LOCKED))
mysql_mutex_unlock(&LOCK_log);
@@ -5196,6 +5344,33 @@ uint MYSQL_BIN_LOG::next_file_id()
}
+/**
+ Calculate checksum of possibly a part of an event containing at least
+ the whole common header.
+
+ @param buf the pointer to trans cache's buffer
+ @param off the offset of the beginning of the event in the buffer
+ @param event_len no-checksum length of the event
+ @param length the current size of the buffer
+
+ @param crc [in-out] the checksum
+
+ Event size in incremented by @c BINLOG_CHECKSUM_LEN.
+
+ @return 0 or number of unprocessed yet bytes of the event excluding
+ the checksum part.
+*/
+ static ulong fix_log_event_crc(uchar *buf, uint off, uint event_len,
+ uint length, ha_checksum *crc)
+{
+ ulong ret;
+ uchar *event_begin= buf + off;
+
+ ret= length >= off + event_len ? 0 : off + event_len - length;
+ *crc= my_checksum(*crc, event_begin, event_len - ret);
+ return ret;
+}
+
/*
Write the contents of a cache to the binary log.
@@ -5203,24 +5378,33 @@ uint MYSQL_BIN_LOG::next_file_id()
write_cache()
thd Current_thread
cache Cache to write to the binary log
- lock_log True if the LOCK_log mutex should be aquired, false otherwise
- sync_log True if the log should be flushed and synced
DESCRIPTION
Write the contents of the cache to the binary log. The cache will
be reset as a READ_CACHE to be able to read the contents from it.
- */
-int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log,
- bool sync_log)
-{
- Mutex_sentry sentry(lock_log ? &LOCK_log : NULL);
+ Reading from the trans cache with possible (per @c binlog_checksum_options)
+ adding checksum value and then fixing the length and the end_log_pos of
+ events prior to fill in the binlog cache.
+*/
+int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
+{
+ mysql_mutex_assert_owner(&LOCK_log);
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
return ER_ERROR_ON_WRITE;
uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
+ ulong remains= 0; // part of unprocessed yet netto length of the event
long val;
+ ulong end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t
uchar header[LOG_EVENT_HEADER_LEN];
+ ha_checksum crc= 0, crc_0= 0; // assignments to keep compiler happy
+ my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF);
+ uchar buf[BINLOG_CHECKSUM_LEN];
+
+ // while there is just one alg the following must hold:
+ DBUG_ASSERT(!do_checksum ||
+ binlog_checksum_options == BINLOG_CHECKSUM_ALG_CRC32);
/*
The events in the buffer have incorrect end_log_pos data
@@ -5238,6 +5422,8 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log,
group= (uint)my_b_tell(&log_file);
hdr_offs= carry= 0;
+ if (do_checksum)
+ crc= crc_0= my_checksum(0L, NULL, 0);
do
{
@@ -5250,12 +5436,21 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log,
DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN);
/* assemble both halves */
- memcpy(&header[carry], (char *)cache->read_pos, LOG_EVENT_HEADER_LEN - carry);
+ memcpy(&header[carry], (char *)cache->read_pos,
+ LOG_EVENT_HEADER_LEN - carry);
/* fix end_log_pos */
- val= uint4korr(&header[LOG_POS_OFFSET]) + group;
+ val= uint4korr(&header[LOG_POS_OFFSET]) + group +
+ (end_log_pos_inc+= (do_checksum ? BINLOG_CHECKSUM_LEN : 0));
int4store(&header[LOG_POS_OFFSET], val);
+ if (do_checksum)
+ {
+ ulong len= uint4korr(&header[EVENT_LEN_OFFSET]);
+ /* fix len */
+ int4store(&header[EVENT_LEN_OFFSET], len + BINLOG_CHECKSUM_LEN);
+ }
+
/* write the first half of the split header */
if (my_b_write(&log_file, header, carry))
return ER_ERROR_ON_WRITE;
@@ -5265,11 +5460,20 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log,
copy fixed second half of header to cache so the correct
version will be written later.
*/
- memcpy((char *)cache->read_pos, &header[carry], LOG_EVENT_HEADER_LEN - carry);
+ memcpy((char *)cache->read_pos, &header[carry],
+ LOG_EVENT_HEADER_LEN - carry);
/* next event header at ... */
- hdr_offs = uint4korr(&header[EVENT_LEN_OFFSET]) - carry;
+ hdr_offs= uint4korr(&header[EVENT_LEN_OFFSET]) - carry -
+ (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
+ if (do_checksum)
+ {
+ DBUG_ASSERT(crc == crc_0 && remains == 0);
+ crc= my_checksum(crc, header, carry);
+ remains= uint4korr(header + EVENT_LEN_OFFSET) - carry -
+ BINLOG_CHECKSUM_LEN;
+ }
carry= 0;
}
@@ -5284,6 +5488,25 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log,
very next iteration, just "eventually").
*/
+ /* crc-calc the whole buffer */
+ if (do_checksum && hdr_offs >= length)
+ {
+
+ DBUG_ASSERT(remains != 0 && crc != crc_0);
+
+ crc= my_checksum(crc, cache->read_pos, length);
+ remains -= length;
+ if (my_b_write(&log_file, cache->read_pos, length))
+ return ER_ERROR_ON_WRITE;
+ if (remains == 0)
+ {
+ int4store(buf, crc);
+ if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
+ return ER_ERROR_ON_WRITE;
+ crc= crc_0;
+ }
+ }
+
while (hdr_offs < length)
{
/*
@@ -5291,6 +5514,26 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log,
we get the rest.
*/
+ if (do_checksum)
+ {
+ if (remains != 0)
+ {
+ /*
+ finish off with remains of the last event that crawls
+ from previous into the current buffer
+ */
+ DBUG_ASSERT(crc != crc_0);
+ crc= my_checksum(crc, cache->read_pos, hdr_offs);
+ int4store(buf, crc);
+ remains -= hdr_offs;
+ DBUG_ASSERT(remains == 0);
+ if (my_b_write(&log_file, cache->read_pos, hdr_offs) ||
+ my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
+ return ER_ERROR_ON_WRITE;
+ crc= crc_0;
+ }
+ }
+
if (hdr_offs + LOG_EVENT_HEADER_LEN > length)
{
carry= length - hdr_offs;
@@ -5300,17 +5543,38 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log,
else
{
/* we've got a full event-header, and it came in one piece */
-
- uchar *log_pos= (uchar *)cache->read_pos + hdr_offs + LOG_POS_OFFSET;
+ uchar *ev= (uchar *)cache->read_pos + hdr_offs;
+ uint event_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len
+ uchar *log_pos= ev + LOG_POS_OFFSET;
/* fix end_log_pos */
- val= uint4korr(log_pos) + group;
+ val= uint4korr(log_pos) + group +
+ (end_log_pos_inc += (do_checksum ? BINLOG_CHECKSUM_LEN : 0));
int4store(log_pos, val);
+ /* fix CRC */
+ if (do_checksum)
+ {
+ /* fix length */
+ int4store(ev + EVENT_LEN_OFFSET, event_len + BINLOG_CHECKSUM_LEN);
+ remains= fix_log_event_crc(cache->read_pos, hdr_offs, event_len,
+ length, &crc);
+ if (my_b_write(&log_file, ev,
+ remains == 0 ? event_len : length - hdr_offs))
+ return ER_ERROR_ON_WRITE;
+ if (remains == 0)
+ {
+ int4store(buf, crc);
+ if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
+ return ER_ERROR_ON_WRITE;
+ crc= crc_0; // crc is complete
+ }
+ }
+
/* next event header at ... */
- log_pos= (uchar *)cache->read_pos + hdr_offs + EVENT_LEN_OFFSET;
- hdr_offs += uint4korr(log_pos);
+ hdr_offs += event_len; // incr by the netto len
+ DBUG_ASSERT(!do_checksum || remains == 0 || hdr_offs >= length);
}
}
@@ -5326,17 +5590,19 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache, bool lock_log,
}
/* Write data to the binary log file */
- if (my_b_write(&log_file, cache->read_pos, length))
- return ER_ERROR_ON_WRITE;
+ DBUG_EXECUTE_IF("fail_binlog_write_1",
+ errno= 28; return ER_ERROR_ON_WRITE;);
+ if (!do_checksum)
+ if (my_b_write(&log_file, cache->read_pos, length))
+ return ER_ERROR_ON_WRITE;
status_var_add(thd->status_var.binlog_bytes_written, length);
cache->read_pos=cache->read_end; // Mark buffer used up
} while ((length= my_b_fill(cache)));
DBUG_ASSERT(carry == 0);
-
- if (sync_log)
- return flush_and_sync(0);
+ DBUG_ASSERT(!do_checksum || remains == 0);
+ DBUG_ASSERT(!do_checksum || crc == crc_0);
return 0; // All OK
}
@@ -5370,31 +5636,50 @@ int query_error_code(THD *thd, bool not_killed)
return error;
}
-bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock)
+
+bool MYSQL_BIN_LOG::write_incident_already_locked(THD *thd)
{
uint error= 0;
- DBUG_ENTER("MYSQL_BIN_LOG::write_incident");
-
- if (!is_open())
- DBUG_RETURN(error);
-
- LEX_STRING const write_error_msg=
- { C_STRING_WITH_LEN("error writing to the binary log") };
+ DBUG_ENTER("MYSQL_BIN_LOG::write_incident_already_locked");
Incident incident= INCIDENT_LOST_EVENTS;
Incident_log_event ev(thd, incident, write_error_msg);
- if (lock)
- mysql_mutex_lock(&LOCK_log);
- error= ev.write(&log_file);
- status_var_add(thd->status_var.binlog_bytes_written, ev.data_written);
- if (lock)
+
+ if (likely(is_open()))
+ {
+ error= ev.write(&log_file);
+ status_var_add(thd->status_var.binlog_bytes_written, ev.data_written);
+ }
+
+ DBUG_RETURN(error);
+}
+
+
+bool MYSQL_BIN_LOG::write_incident(THD *thd)
+{
+ uint error= 0;
+ my_off_t offset;
+ DBUG_ENTER("MYSQL_BIN_LOG::write_incident");
+
+ mysql_mutex_lock(&LOCK_log);
+ if (likely(is_open()))
{
- if (!error && !(error= flush_and_sync(0)))
+ if (!(error= write_incident_already_locked(thd)) &&
+ !(error= flush_and_sync(0)))
{
signal_update();
error= rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
}
- mysql_mutex_unlock(&LOCK_log);
+ offset= my_b_tell(&log_file);
+ /*
+ Take mutex to protect against a reader seeing partial writes of 64-bit
+ offset on 32-bit CPUs.
+ */
+ mysql_mutex_lock(&LOCK_commit_ordered);
+ last_commit_pos_offset= offset;
+ mysql_mutex_unlock(&LOCK_commit_ordered);
}
+ mysql_mutex_unlock(&LOCK_log);
+
DBUG_RETURN(error);
}
@@ -5422,111 +5707,425 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock)
'cache' needs to be reinitialized after this functions returns.
*/
-bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event,
- bool incident)
+bool
+MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
+ binlog_cache_mngr *cache_mngr,
+ Log_event *end_ev, bool all,
+ bool using_stmt_cache,
+ bool using_trx_cache)
+{
+ group_commit_entry entry;
+ DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog");
+
+ entry.thd= thd;
+ entry.cache_mngr= cache_mngr;
+ entry.error= 0;
+ entry.all= all;
+ entry.using_stmt_cache= using_stmt_cache;
+ entry.using_trx_cache= using_trx_cache;
+
+ /*
+ Log "BEGIN" at the beginning of every transaction. Here, a transaction is
+ either a BEGIN..COMMIT block or a single statement in autocommit mode.
+
+ Create the necessary events here, where we have the correct THD (and
+ thread context).
+
+ Due to group commit the actual writing to binlog may happen in a different
+ thread.
+ */
+ Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), using_trx_cache, TRUE,
+ TRUE, 0);
+ entry.begin_event= &qinfo;
+ entry.end_event= end_ev;
+ if (cache_mngr->stmt_cache.has_incident() ||
+ cache_mngr->trx_cache.has_incident())
+ {
+ Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg);
+ entry.incident_event= &inc_ev;
+ DBUG_RETURN(write_transaction_to_binlog_events(&entry));
+ }
+ else
+ {
+ entry.incident_event= NULL;
+ DBUG_RETURN(write_transaction_to_binlog_events(&entry));
+ }
+}
+
+bool
+MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
{
- DBUG_ENTER("MYSQL_BIN_LOG::write(THD *, IO_CACHE *, Log_event *)");
+ /*
+ To facilitate group commit for the binlog, we first queue up ourselves in
+ the group commit queue. Then the first thread to enter the queue waits for
+ the LOCK_log mutex, and commits for everyone in the queue once it gets the
+ lock. Any other threads in the queue just wait for the first one to finish
+ the commit and wake them up.
+ */
+
+ entry->thd->clear_wakeup_ready();
+ mysql_mutex_lock(&LOCK_prepare_ordered);
+ group_commit_entry *orig_queue= group_commit_queue;
+ entry->next= orig_queue;
+ group_commit_queue= entry;
+
+ if (entry->cache_mngr->using_xa)
+ {
+ DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered");
+ run_prepare_ordered(entry->thd, entry->all);
+ DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered");
+ }
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
+ DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered");
+
+ /*
+ The first in the queue handle group commit for all; the others just wait
+ to be signalled when group commit is done.
+ */
+ if (orig_queue != NULL)
+ entry->thd->wait_for_wakeup_ready();
+ else
+ trx_group_commit_leader(entry);
+
+ if (!opt_optimize_thread_scheduling)
+ {
+ /* For the leader, trx_group_commit_leader() already took the lock. */
+ if (orig_queue != NULL)
+ mysql_mutex_lock(&LOCK_commit_ordered);
+
+ DEBUG_SYNC(entry->thd, "commit_loop_entry_commit_ordered");
+ ++num_commits;
+ if (entry->cache_mngr->using_xa && !entry->error)
+ run_commit_ordered(entry->thd, entry->all);
+
+ group_commit_entry *next= entry->next;
+ if (!next)
+ {
+ group_commit_queue_busy= FALSE;
+ mysql_cond_signal(&COND_queue_busy);
+ DEBUG_SYNC(entry->thd, "commit_after_group_run_commit_ordered");
+ }
+ mysql_mutex_unlock(&LOCK_commit_ordered);
+
+ if (next)
+ {
+ next->thd->signal_wakeup_ready();
+ }
+ }
+
+ if (likely(!entry->error))
+ return 0;
+
+ switch (entry->error)
+ {
+ case ER_ERROR_ON_WRITE:
+ my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, entry->commit_errno);
+ break;
+ case ER_ERROR_ON_READ:
+ my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH),
+ entry->error_cache->file_name, entry->commit_errno);
+ break;
+ default:
+ /*
+ There are not (and should not be) any errors thrown not covered above.
+ But just in case one is added later without updating the above switch
+ statement, include a catch-all.
+ */
+ my_printf_error(entry->error,
+ "Error writing transaction to binary log: %d",
+ MYF(ME_NOREFRESH), entry->error);
+ }
+
+ /*
+ Since we return error, this transaction XID will not be committed, so
+ we need to mark it as not needed for recovery (unlog() is not called
+ for a transaction if log_xid() fails).
+ */
+ if (entry->cache_mngr->using_xa && entry->cache_mngr->xa_xid)
+ mark_xid_done();
+
+ return 1;
+}
+
+/*
+ Do binlog group commit as the lead thread.
+
+ This must be called when this statement/transaction is queued at the start of
+ the group_commit_queue. It will wait to obtain the LOCK_log mutex, then group
+ commit all the transactions in the queue (more may have entered while waiting
+ for LOCK_log). After commit is done, all other threads in the queue will be
+ signalled.
+
+ */
+void
+MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
+{
+ uint xid_count= 0;
+ my_off_t commit_offset;
+ group_commit_entry *current;
+ group_commit_entry *last_in_queue;
+ DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader");
+ LINT_INIT(commit_offset);
+
+ /*
+ Lock the LOCK_log(), and once we get it, collect any additional writes
+ that queued up while we were waiting.
+ */
mysql_mutex_lock(&LOCK_log);
+ DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log");
+
+ mysql_mutex_lock(&LOCK_prepare_ordered);
+ current= group_commit_queue;
+ group_commit_queue= NULL;
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
+ /* As the queue is in reverse order of entering, reverse it. */
+ group_commit_entry *queue= NULL;
+ last_in_queue= current;
+ while (current)
+ {
+ group_commit_entry *next= current->next;
+ current->next= queue;
+ queue= current;
+ current= next;
+ }
+ DBUG_ASSERT(leader == queue /* the leader should be first in queue */);
+
+ /* Now we have in queue the list of transactions to be committed in order. */
DBUG_ASSERT(is_open());
if (likely(is_open())) // Should always be true
{
/*
- We only bother to write to the binary log if there is anything
- to write.
- */
- if (my_b_tell(cache) > 0)
+ Commit every transaction in the queue.
+
+ Note that we are doing this in a different thread than the one running
+ the transaction! So we are limited in the operations we can do. In
+ particular, we cannot call my_error() on behalf of a transaction, as
+ that obtains the THD from thread local storage. Instead, we must set
+ current->error and let the thread do the error reporting itself once
+ we wake it up.
+ */
+ for (current= queue; current != NULL; current= current->next)
{
+ binlog_cache_mngr *cache_mngr= current->cache_mngr;
+
/*
- Log "BEGIN" at the beginning of every transaction. Here, a
- transaction is either a BEGIN..COMMIT block or a single
- statement in autocommit mode.
+ We already checked before that at least one cache is non-empty; if both
+ are empty we would have skipped calling into here.
*/
- Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE, TRUE, 0);
- if (qinfo.write(&log_file))
- goto err;
+ DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty());
- status_var_add(thd->status_var.binlog_bytes_written, qinfo.data_written);
+ current->error= write_transaction_or_stmt(current);
- DBUG_EXECUTE_IF("crash_before_writing_xid",
- {
- if ((write_error= write_cache(thd, cache, FALSE,
- TRUE)))
- DBUG_PRINT("info", ("error writing binlog cache: %d",
- write_error));
- DBUG_PRINT("info", ("crashing before writing xid"));
- DBUG_SUICIDE();
- });
-
- if ((write_error= write_cache(thd, cache, FALSE, FALSE)))
- goto err;
+ strmake(cache_mngr->last_commit_pos_file, log_file_name,
+ sizeof(cache_mngr->last_commit_pos_file)-1);
+ commit_offset= my_b_write_tell(&log_file);
+ cache_mngr->last_commit_pos_offset= commit_offset;
+ if (cache_mngr->using_xa && cache_mngr->xa_xid)
+ xid_count++;
+ }
- if (commit_event)
+ bool synced= 0;
+ if (flush_and_sync(&synced))
+ {
+ for (current= queue; current != NULL; current= current->next)
{
- if (commit_event->write(&log_file))
- goto err;
- status_var_add(thd->status_var.binlog_bytes_written,
- commit_event->data_written);
+ if (!current->error)
+ {
+ current->error= ER_ERROR_ON_WRITE;
+ current->commit_errno= errno;
+ current->error_cache= NULL;
+ }
}
-
- if (incident && write_incident(thd, FALSE))
- goto err;
-
- bool synced= 0;
- if (flush_and_sync(&synced))
- goto err;
- DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_SUICIDE(););
- if (cache->error) // Error on read
+ }
+ else
+ {
+ bool any_error= false;
+ bool all_error= true;
+ for (current= queue; current != NULL; current= current->next)
{
- sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno);
- write_error=1; // Don't give more errors
- goto err;
+ if (!current->error &&
+ RUN_HOOK(binlog_storage, after_flush,
+ (current->thd, log_file_name, log_file.pos_in_file, synced)))
+ {
+ current->error= ER_ERROR_ON_WRITE;
+ current->commit_errno= -1;
+ current->error_cache= NULL;
+ any_error= true;
+ }
+ else
+ all_error= false;
}
- if (RUN_HOOK(binlog_storage, after_flush,
- (thd, log_file_name, log_file.pos_in_file, synced)))
- {
+ if (any_error)
sql_print_error("Failed to run 'after_flush' hooks");
- write_error=1;
- goto err;
- }
-
- signal_update();
+ if (!all_error)
+ signal_update();
}
/*
- if commit_event is Xid_log_event, increase the number of
- prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated
+ if any commit_events are Xid_log_event, increase the number of
+ prepared_xids (it's decreased in ::unlog()). Binlog cannot be rotated
if there're prepared xids in it - see the comment in new_file() for
an explanation.
- If the commit_event is not Xid_log_event (then it's a Query_log_event)
- rotate binlog, if necessary.
+ If no Xid_log_events (then it's all Query_log_event) rotate binlog,
+ if necessary.
*/
- if (commit_event && commit_event->get_type_code() == XID_EVENT)
+ if (xid_count > 0)
{
- mysql_mutex_lock(&LOCK_prep_xids);
- prepared_xids++;
- mysql_mutex_unlock(&LOCK_prep_xids);
+ mark_xids_active(xid_count);
}
else
+ {
if (rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED))
- goto err;
+ {
+ /*
+ If we fail to rotate, which thread should get the error?
+ We give the error to the *last* transaction thread; that seems to
+ make the most sense, as it was the last to write to the log.
+ */
+ last_in_queue->error= ER_ERROR_ON_WRITE;
+ last_in_queue->commit_errno= errno;
+ }
+ }
}
+
+ DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered");
+ mysql_mutex_lock(&LOCK_commit_ordered);
+ last_commit_pos_offset= commit_offset;
+ /*
+ We cannot unlock LOCK_log until we have locked LOCK_commit_ordered;
+ otherwise scheduling could allow the next group commit to run ahead of us,
+ messing up the order of commit_ordered() calls. But as soon as
+ LOCK_commit_ordered is obtained, we can let the next group commit start.
+ */
mysql_mutex_unlock(&LOCK_log);
+ DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log");
+ ++num_group_commits;
- DBUG_RETURN(0);
+ if (!opt_optimize_thread_scheduling)
+ {
+ /*
+ If we want to run commit_ordered() each in the transaction's own thread
+ context, then we need to mark the queue reserved; we need to finish all
+ threads in one group commit before the next group commit can be allowed
+ to proceed, and we cannot unlock a simple pthreads mutex in a different
+ thread from the one that locked it.
+ */
-err:
- if (!write_error)
+ while (group_commit_queue_busy)
+ mysql_cond_wait(&COND_queue_busy, &LOCK_commit_ordered);
+ group_commit_queue_busy= TRUE;
+
+ /* Note that we return with LOCK_commit_ordered locked! */
+ DBUG_VOID_RETURN;
+ }
+
+ /*
+ Wakeup each participant waiting for our group commit, first calling the
+ commit_ordered() methods for any transactions doing 2-phase commit.
+ */
+ current= queue;
+ while (current != NULL)
{
- write_error= 1;
- sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
+ group_commit_entry *next;
+
+ DEBUG_SYNC(leader->thd, "commit_loop_entry_commit_ordered");
+ ++num_commits;
+ if (current->cache_mngr->using_xa && !current->error)
+ run_commit_ordered(current->thd, current->all);
+
+ /*
+ Careful not to access current->next after waking up the other thread! As
+ it may change immediately after wakeup.
+ */
+ next= current->next;
+ if (current != leader) // Don't wake up ourself
+ current->thd->signal_wakeup_ready();
+ current= next;
}
- mysql_mutex_unlock(&LOCK_log);
- DBUG_RETURN(1);
+ DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered");
+ mysql_mutex_unlock(&LOCK_commit_ordered);
+
+ DBUG_VOID_RETURN;
}
+int
+MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry)
+{
+ binlog_cache_mngr *mngr= entry->cache_mngr;
+
+ if (entry->begin_event->write(&log_file))
+ return ER_ERROR_ON_WRITE;
+ status_var_add(entry->thd->status_var.binlog_bytes_written,
+ entry->begin_event->data_written);
+
+ if (entry->using_stmt_cache && !mngr->stmt_cache.empty() &&
+ write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE)))
+ {
+ entry->error_cache= &mngr->stmt_cache.cache_log;
+ entry->commit_errno= errno;
+ return ER_ERROR_ON_WRITE;
+ }
+
+ if (entry->using_trx_cache && !mngr->trx_cache.empty())
+ {
+ DBUG_EXECUTE_IF("crash_before_writing_xid",
+ {
+ if ((write_cache(entry->thd,
+ mngr->get_binlog_cache_log(TRUE))))
+ DBUG_PRINT("info", ("error writing binlog cache"));
+ else
+ flush_and_sync(0);
+
+ DBUG_PRINT("info", ("crashing before writing xid"));
+ DBUG_SUICIDE();
+ });
+
+ if (write_cache(entry->thd, mngr->get_binlog_cache_log(TRUE)))
+ {
+ entry->error_cache= &mngr->trx_cache.cache_log;
+ entry->commit_errno= errno;
+ return ER_ERROR_ON_WRITE;
+ }
+ }
+
+ if (entry->end_event->write(&log_file))
+ {
+ entry->error_cache= NULL;
+ entry->commit_errno= errno;
+ return ER_ERROR_ON_WRITE;
+ }
+ status_var_add(entry->thd->status_var.binlog_bytes_written,
+ entry->end_event->data_written);
+
+ if (entry->incident_event)
+ {
+ if (entry->incident_event->write(&log_file))
+ {
+ entry->error_cache= NULL;
+ entry->commit_errno= errno;
+ return ER_ERROR_ON_WRITE;
+ }
+ }
+
+ if (mngr->get_binlog_cache_log(FALSE)->error) // Error on read
+ {
+ entry->error_cache= &mngr->stmt_cache.cache_log;
+ entry->commit_errno= errno;
+ return ER_ERROR_ON_READ;
+ }
+ if (mngr->get_binlog_cache_log(TRUE)->error) // Error on read
+ {
+ entry->error_cache= &mngr->trx_cache.cache_log;
+ entry->commit_errno= errno;
+ return ER_ERROR_ON_READ;
+ }
+
+ return 0;
+}
+
/**
Wait until we get a signal that the relay log has been updated.
@@ -5608,6 +6207,11 @@ void MYSQL_BIN_LOG::close(uint exiting)
(exiting & LOG_CLOSE_STOP_EVENT))
{
Stop_log_event s;
+ // the checksumming rule for relay-log case is similar to Rotate
+ s.checksum_alg= is_relay_log ?
+ (uint8) relay_log_checksum_alg : (uint8) binlog_checksum_options;
+ DBUG_ASSERT(!is_relay_log ||
+ relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
s.write(&log_file);
bytes_written+= s.data_written;
signal_update();
@@ -5724,9 +6328,23 @@ static bool test_if_number(register const char *str,
void sql_perror(const char *message)
{
-#ifdef HAVE_STRERROR
+#if defined(_WIN32)
+ char* buf;
+ DWORD dw= GetLastError();
+ if (FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
+ FORMAT_MESSAGE_IGNORE_INSERTS, NULL, dw,
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPSTR)&buf, 0, NULL ) > 0)
+ {
+ sql_print_error("%s: %s",message, buf);
+ LocalFree((HLOCAL)buf);
+ }
+ else
+ {
+ sql_print_error("%s", message);
+ }
+#elif defined(HAVE_STRERROR)
sql_print_error("%s: %s",message, strerror(errno));
-#else
+#else
perror(message);
#endif
}
@@ -5931,6 +6549,148 @@ void sql_print_information(const char *format, ...)
}
+void
+TC_LOG::run_prepare_ordered(THD *thd, bool all)
+{
+ Ha_trx_info *ha_info=
+ all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list;
+
+ mysql_mutex_assert_owner(&LOCK_prepare_ordered);
+ for (; ha_info; ha_info= ha_info->next())
+ {
+ handlerton *ht= ha_info->ht();
+ if (!ht->prepare_ordered)
+ continue;
+ ht->prepare_ordered(ht, thd, all);
+ }
+}
+
+
+void
+TC_LOG::run_commit_ordered(THD *thd, bool all)
+{
+ Ha_trx_info *ha_info=
+ all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list;
+
+ mysql_mutex_assert_owner(&LOCK_commit_ordered);
+ for (; ha_info; ha_info= ha_info->next())
+ {
+ handlerton *ht= ha_info->ht();
+ if (!ht->commit_ordered)
+ continue;
+ ht->commit_ordered(ht, thd, all);
+ DEBUG_SYNC(thd, "commit_after_run_commit_ordered");
+ }
+}
+
+
+int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
+ bool need_prepare_ordered,
+ bool need_commit_ordered)
+{
+ int cookie;
+ struct commit_entry entry;
+ bool is_group_commit_leader;
+ LINT_INIT(is_group_commit_leader);
+
+ if (need_prepare_ordered)
+ {
+ mysql_mutex_lock(&LOCK_prepare_ordered);
+ run_prepare_ordered(thd, all);
+ if (need_commit_ordered)
+ {
+ /*
+ Must put us in queue so we can run_commit_ordered() in same sequence
+ as we did run_prepare_ordered().
+ */
+ thd->clear_wakeup_ready();
+ entry.thd= thd;
+ commit_entry *previous_queue= commit_ordered_queue;
+ entry.next= previous_queue;
+ commit_ordered_queue= &entry;
+ is_group_commit_leader= (previous_queue == NULL);
+ }
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
+ }
+
+ cookie= 0;
+ if (xid)
+ cookie= log_one_transaction(xid);
+
+ if (need_commit_ordered)
+ {
+ if (need_prepare_ordered)
+ {
+ /*
+ We did the run_prepare_ordered() serialised, then ran the log_xid() in
+ parallel. Now we have to do run_commit_ordered() serialised in the
+ same sequence as run_prepare_ordered().
+
+ We do this starting from the head of the queue, each thread doing
+ run_commit_ordered() and signalling the next in queue.
+ */
+ if (is_group_commit_leader)
+ {
+ /* The first in queue starts the ball rolling. */
+ mysql_mutex_lock(&LOCK_prepare_ordered);
+ while (commit_ordered_queue_busy)
+ mysql_cond_wait(&COND_queue_busy, &LOCK_prepare_ordered);
+ commit_entry *queue= commit_ordered_queue;
+ commit_ordered_queue= NULL;
+ /*
+ Mark the queue busy while we bounce it from one thread to the
+ next.
+ */
+ commit_ordered_queue_busy= true;
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
+
+ /* Reverse the queue list so we get correct order. */
+ commit_entry *prev= NULL;
+ while (queue)
+ {
+ commit_entry *next= queue->next;
+ queue->next= prev;
+ prev= queue;
+ queue= next;
+ }
+ DBUG_ASSERT(prev == &entry && prev->thd == thd);
+ }
+ else
+ {
+ /* Not first in queue; just wait until previous thread wakes us up. */
+ thd->wait_for_wakeup_ready();
+ }
+ }
+
+ /* Only run commit_ordered() if log_xid was successful. */
+ if (cookie)
+ {
+ mysql_mutex_lock(&LOCK_commit_ordered);
+ run_commit_ordered(thd, all);
+ mysql_mutex_unlock(&LOCK_commit_ordered);
+ }
+
+ if (need_prepare_ordered)
+ {
+ commit_entry *next= entry.next;
+ if (next)
+ {
+ next->thd->signal_wakeup_ready();
+ }
+ else
+ {
+ mysql_mutex_lock(&LOCK_prepare_ordered);
+ commit_ordered_queue_busy= false;
+ mysql_cond_signal(&COND_queue_busy);
+ mysql_mutex_unlock(&LOCK_prepare_ordered);
+ }
+ }
+ }
+
+ return cookie;
+}
+
+
/********* transaction coordinator log for 2pc - mmap() based solution *******/
/*
@@ -6068,6 +6828,7 @@ int TC_LOG_MMAP::open(const char *opt_name)
mysql_mutex_init(key_LOCK_pool, &LOCK_pool, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_active, &COND_active, 0);
mysql_cond_init(key_COND_pool, &COND_pool, 0);
+ mysql_cond_init(key_COND_queue_busy, &COND_queue_busy, 0);
inited=6;
@@ -6075,6 +6836,8 @@ int TC_LOG_MMAP::open(const char *opt_name)
active=pages;
pool=pages+1;
pool_last=pages+npages-1;
+ commit_ordered_queue= NULL;
+ commit_ordered_queue_busy= false;
return 0;
@@ -6180,7 +6943,7 @@ int TC_LOG_MMAP::overflow()
to the position in memory where xid was logged to.
*/
-int TC_LOG_MMAP::log_xid(THD *thd, my_xid xid)
+int TC_LOG_MMAP::log_one_transaction(my_xid xid)
{
int err;
PAGE *p;
@@ -6350,6 +7113,8 @@ void TC_LOG_MMAP::close()
mysql_mutex_destroy(&LOCK_active);
mysql_mutex_destroy(&LOCK_pool);
mysql_cond_destroy(&COND_pool);
+ mysql_cond_destroy(&COND_active);
+ mysql_cond_destroy(&COND_queue_busy);
case 5:
data[0]='A'; // garble the first (signature) byte, in case mysql_file_delete fails
case 4:
@@ -6529,7 +7294,8 @@ int TC_LOG_BINLOG::open(const char *opt_name)
goto err;
}
- if ((ev= Log_event::read_log_event(&log, 0, &fdle)) &&
+ if ((ev= Log_event::read_log_event(&log, 0, &fdle,
+ opt_master_verify_checksum)) &&
ev->get_type_code() == FORMAT_DESCRIPTION_EVENT &&
ev->flags & LOG_EVENT_BINLOG_IN_USE_F)
{
@@ -6559,42 +7325,86 @@ void TC_LOG_BINLOG::close()
mysql_cond_destroy(&COND_prep_xids);
}
-/**
- @todo
- group commit
-
- @retval
- 0 error
- @retval
- 1 success
+/*
+ Do a binlog log_xid() for a group of transactions, linked through
+ thd->next_commit_ordered.
*/
-int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid)
+int
+TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all,
+ bool need_prepare_ordered __attribute__((unused)),
+ bool need_commit_ordered __attribute__((unused)))
{
- DBUG_ENTER("TC_LOG_BINLOG::log");
+ int err;
+ DBUG_ENTER("TC_LOG_BINLOG::log_and_order");
+
binlog_cache_mngr *cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
- /*
- We always commit the entire transaction when writing an XID. Also
- note that the return value is inverted.
- */
- DBUG_RETURN(!binlog_commit_flush_stmt_cache(thd, cache_mngr) &&
- !binlog_commit_flush_trx_cache(thd, cache_mngr, xid));
+
+ cache_mngr->using_xa= TRUE;
+ cache_mngr->xa_xid= xid;
+ err= binlog_commit_flush_xid_caches(thd, cache_mngr, all, xid);
+
+ DEBUG_SYNC(thd, "binlog_after_log_and_order");
+
+ DBUG_RETURN(!err);
}
-int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
+/*
+ After an XID is logged, we need to hold on to the current binlog file until
+ it is fully committed in the storage engine. The reason is that crash
+ recovery only looks at the latest binlog, so we must make sure there are no
+ outstanding prepared (but not committed) transactions before rotating the
+ binlog.
+
+ To handle this, we keep a count of outstanding XIDs. This function is used
+ to increase this count when committing one or more transactions to the
+ binary log.
+*/
+void
+TC_LOG_BINLOG::mark_xids_active(uint xid_count)
{
- DBUG_ENTER("TC_LOG_BINLOG::unlog");
+ DBUG_ENTER("TC_LOG_BINLOG::mark_xids_active");
+ DBUG_PRINT("info", ("xid_count=%u", xid_count));
+ mysql_mutex_lock(&LOCK_prep_xids);
+ prepared_xids+= xid_count;
+ mysql_mutex_unlock(&LOCK_prep_xids);
+ DBUG_VOID_RETURN;
+}
+
+/*
+ Once an XID is committed, it is safe to rotate the binary log, as it can no
+ longer be needed during crash recovery.
+
+ This function is called to mark an XID this way. It needs to decrease the
+ count of pending XIDs, and signal the log rotator thread when it reaches zero.
+*/
+void
+TC_LOG_BINLOG::mark_xid_done()
+{
+ my_bool send_signal;
+
+ DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done");
mysql_mutex_lock(&LOCK_prep_xids);
// prepared_xids can be 0 if the transaction had ignorable errors.
DBUG_ASSERT(prepared_xids >= 0);
if (prepared_xids > 0)
prepared_xids--;
- if (prepared_xids == 0) {
+ send_signal= (prepared_xids == 0);
+ mysql_mutex_unlock(&LOCK_prep_xids);
+ if (send_signal) {
DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
mysql_cond_signal(&COND_prep_xids);
}
- mysql_mutex_unlock(&LOCK_prep_xids);
- DBUG_RETURN(rotate_and_purge(0)); // as ::write() did not rotate
+ DBUG_VOID_RETURN;
+}
+
+int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
+{
+ DBUG_ENTER("TC_LOG_BINLOG::unlog");
+ if (xid)
+ mark_xid_done();
+ /* As ::write_transaction_to_binlog() did not rotate, do it here. */
+ DBUG_RETURN(rotate_and_purge(0));
}
int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
@@ -6612,7 +7422,9 @@ int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
fdle->flags&= ~LOG_EVENT_BINLOG_IN_USE_F; // abort on the first error
- while ((ev= Log_event::read_log_event(log,0,fdle)) && ev->is_valid())
+ while ((ev= Log_event::read_log_event(log, 0, fdle,
+ opt_master_verify_checksum))
+ && ev->is_valid())
{
if (ev->get_type_code() == XID_EVENT)
{
@@ -6663,9 +7475,170 @@ ulonglong mysql_bin_log_file_pos(void)
{
return (ulonglong) mysql_bin_log.get_log_file()->pos_in_file;
}
+/*
+ Get the current position of the MySQL binlog for transaction currently being
+ committed.
+
+ This is valid to call from within storage engine commit_ordered() and
+ commit() methods only.
+
+ Since it stores the position inside THD, it is safe to call without any
+ locking.
+*/
+void
+mysql_bin_log_commit_pos(THD *thd, ulonglong *out_pos, const char **out_file)
+{
+ binlog_cache_mngr *cache_mngr;
+ if (opt_bin_log &&
+ (cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton)))
+ {
+ *out_file= cache_mngr->last_commit_pos_file;
+ *out_pos= (ulonglong)(cache_mngr->last_commit_pos_offset);
+ }
+ else
+ {
+ *out_file= NULL;
+ *out_pos= 0;
+ }
+}
#endif /* INNODB_COMPATIBILITY_HOOKS */
+static void
+binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var,
+ void *var_ptr, const void *save)
+{
+ ulong value= *((ulong *)save);
+
+ mysql_mutex_lock(mysql_bin_log.get_log_lock());
+ if(mysql_bin_log.is_open())
+ {
+ uint flags= RP_FORCE_ROTATE | RP_LOCK_LOG_IS_ALREADY_LOCKED |
+ (binlog_checksum_options != (uint) value?
+ RP_BINLOG_CHECKSUM_ALG_CHANGE : 0);
+ if (flags & RP_BINLOG_CHECKSUM_ALG_CHANGE)
+ mysql_bin_log.checksum_alg_reset= (uint8) value;
+ mysql_bin_log.rotate_and_purge(flags);
+ }
+ else
+ {
+ binlog_checksum_options= value;
+ }
+ DBUG_ASSERT((ulong) binlog_checksum_options == value);
+ DBUG_ASSERT(mysql_bin_log.checksum_alg_reset == BINLOG_CHECKSUM_ALG_UNDEF);
+ mysql_mutex_unlock(mysql_bin_log.get_log_lock());
+}
+
+
+static int show_binlog_vars(THD *thd, SHOW_VAR *var, char *buff)
+{
+ mysql_bin_log.set_status_variables(thd);
+ var->type= SHOW_ARRAY;
+ var->value= (char *)&binlog_status_vars_detail;
+ return 0;
+}
+
+static SHOW_VAR binlog_status_vars_top[]= {
+ {"binlog", (char *) &show_binlog_vars, SHOW_FUNC},
+ {NullS, NullS, SHOW_LONG}
+};
+
+static MYSQL_SYSVAR_BOOL(
+ optimize_thread_scheduling,
+ opt_optimize_thread_scheduling,
+ PLUGIN_VAR_READONLY,
+ "Run fast part of group commit in a single thread, to optimize kernel "
+ "thread scheduling. On by default. Disable to run each transaction in group "
+ "commit in its own thread, which can be slower at very high concurrency. "
+ "This option is mostly for testing one algorithm versus the other, and it "
+ "should not normally be necessary to change it.",
+ NULL,
+ NULL,
+ 1);
+
+static MYSQL_SYSVAR_ENUM(
+ checksum,
+ binlog_checksum_options,
+ PLUGIN_VAR_RQCMDARG,
+ "Type of BINLOG_CHECKSUM_ALG. Include checksum for "
+ "log events in the binary log. Possible values are NONE and CRC32; "
+ "default is NONE.",
+ NULL,
+ binlog_checksum_update,
+ BINLOG_CHECKSUM_ALG_OFF,
+ &binlog_checksum_typelib);
+
+#ifndef DBUG_OFF
+static MYSQL_SYSVAR_ULONG(
+ dbug_fsync_sleep,
+ opt_binlog_dbug_fsync_sleep,
+ PLUGIN_VAR_RQCMDARG,
+ "Extra sleep (in microseconds) to add to binlog fsync(), for debugging",
+ NULL,
+ NULL,
+ 0,
+ 0,
+ ULONG_MAX,
+ 0);
+#endif
+
+static struct st_mysql_sys_var *binlog_sys_vars[]=
+{
+ MYSQL_SYSVAR(optimize_thread_scheduling),
+ MYSQL_SYSVAR(checksum),
+#ifndef DBUG_OFF
+ MYSQL_SYSVAR(dbug_fsync_sleep),
+#endif
+ NULL
+};
+
+
+/*
+ Copy out the non-directory part of binlog position filename for the
+ `binlog_snapshot_file' status variable, same way as it is done for
+ SHOW MASTER STATUS.
+*/
+static void
+set_binlog_snapshot_file(const char *src)
+{
+ int dir_len = dirname_length(src);
+ strmake(binlog_snapshot_file, src + dir_len, sizeof(binlog_snapshot_file)-1);
+}
+
+/*
+ Copy out current values of status variables, for SHOW STATUS or
+ information_schema.global_status.
+
+ This is called only under LOCK_status, so we can fill in a static array.
+*/
+void
+TC_LOG_BINLOG::set_status_variables(THD *thd)
+{
+ binlog_cache_mngr *cache_mngr;
+
+ if (thd && opt_bin_log)
+ cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ else
+ cache_mngr= 0;
+
+ bool have_snapshot= (cache_mngr && cache_mngr->last_commit_pos_file[0] != 0);
+ mysql_mutex_lock(&LOCK_commit_ordered);
+ binlog_status_var_num_commits= this->num_commits;
+ binlog_status_var_num_group_commits= this->num_group_commits;
+ if (!have_snapshot)
+ {
+ set_binlog_snapshot_file(last_commit_pos_file);
+ binlog_snapshot_position= last_commit_pos_offset;
+ }
+ mysql_mutex_unlock(&LOCK_commit_ordered);
+
+ if (have_snapshot)
+ {
+ set_binlog_snapshot_file(cache_mngr->last_commit_pos_file);
+ binlog_snapshot_position= cache_mngr->last_commit_pos_offset;
+ }
+}
+
struct st_mysql_storage_engine binlog_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
@@ -6680,8 +7653,8 @@ mysql_declare_plugin(binlog)
binlog_init, /* Plugin Init */
NULL, /* Plugin Deinit */
0x0100 /* 1.0 */,
- NULL, /* status variables */
- NULL, /* system variables */
+ binlog_status_vars_top, /* status variables */
+ binlog_sys_vars, /* system variables */
NULL /* config options */
}
mysql_declare_plugin_end;
@@ -6696,8 +7669,8 @@ maria_declare_plugin(binlog)
binlog_init, /* Plugin Init */
NULL, /* Plugin Deinit */
0x0100 /* 1.0 */,
- NULL, /* status variables */
- NULL, /* system variables */
+ binlog_status_vars_top, /* status variables */
+ binlog_sys_vars, /* system variables */
"1.0", /* string version */
MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
}