summaryrefslogtreecommitdiff
path: root/sql/log.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log.cc')
-rw-r--r--sql/log.cc507
1 files changed, 306 insertions, 201 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 41e413e9ff7..6ef1c1ea912 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -2,8 +2,7 @@
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
+ the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
@@ -21,6 +20,7 @@
#include "mysql_priv.h"
#include "sql_repl.h"
#include "rpl_filter.h"
+#include "rpl_rli.h"
#include <my_dir.h>
#include <stdarg.h>
@@ -32,15 +32,6 @@
#include <mysql/plugin.h>
-/*
- Define placement versions of operator new and operator delete since
- we cannot be sure that the <new> include exists.
- */
-inline void *operator new(size_t, void *ptr) { return ptr; }
-inline void *operator new[](size_t, void *ptr) { return ptr; }
-inline void operator delete(void*, void*) { /* Do nothing */ }
-inline void operator delete[](void*, void*) { /* Do nothing */ }
-
/* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024
#define MAX_USER_HOST_SIZE 512
@@ -76,12 +67,47 @@ sql_print_message_func sql_print_message_handlers[3] =
char *make_default_log_name(char *buff,const char* log_ext)
{
- strmake(buff, glob_hostname, FN_REFLEN-5);
+ strmake(buff, pidfile_name, FN_REFLEN-5);
return fn_format(buff, buff, mysql_data_home, log_ext,
- MYF(MY_UNPACK_FILENAME|MY_APPEND_EXT));
+ MYF(MY_UNPACK_FILENAME|MY_REPLACE_EXT));
}
/*
+ Helper class to hold a mutex for the duration of the
+ block.
+
+ Eliminates the need for explicit unlocking of mutexes on, e.g.,
+ error returns. On passing a null pointer, the sentry will not do
+ anything.
+ */
+class Mutex_sentry
+{
+public:
+ Mutex_sentry(pthread_mutex_t *mutex)
+ : m_mutex(mutex)
+ {
+ if (m_mutex)
+ pthread_mutex_lock(mutex);
+ }
+
+ ~Mutex_sentry()
+ {
+ if (m_mutex)
+ pthread_mutex_unlock(m_mutex);
+#ifndef DBUG_OFF
+ m_mutex= 0;
+#endif
+ }
+
+private:
+ pthread_mutex_t *m_mutex;
+
+ // It's not allowed to copy this object in any way
+ Mutex_sentry(Mutex_sentry const&);
+ void operator=(Mutex_sentry const&);
+};
+
+/*
Helper class to store binary log transaction data.
*/
class binlog_trx_data {
@@ -113,9 +139,13 @@ public:
*/
void truncate(my_off_t pos)
{
+ DBUG_PRINT("info", ("truncating to position %lu", (ulong) pos));
+ DBUG_PRINT("info", ("before_stmt_pos=%lu", (ulong) pos));
delete pending();
set_pending(0);
reinit_io_cache(&trans_log, WRITE_CACHE, pos, 0, 0);
+ if (pos < before_stmt_pos)
+ before_stmt_pos= MY_OFF_T_UNDEF;
}
/*
@@ -230,7 +260,7 @@ bool Log_to_csv_event_handler::open_log_table(uint log_table_type)
table->table_name_length= 8;
break;
default:
- DBUG_ASSERT(0);
+ assert(0); // Impossible
}
/*
@@ -274,6 +304,7 @@ bool Log_to_csv_event_handler::open_log_table(uint log_table_type)
{
table->table->use_all_columns();
table->table->locked_by_logger= TRUE;
+ table->table->no_replicate= TRUE;
}
/* restore thread settings */
if (curr)
@@ -539,11 +570,21 @@ bool Log_to_csv_event_handler::
if (query_start_arg)
{
+ /*
+ A TIME field can not hold the full longlong range; query_time or
+ lock_time may be truncated without warning here, if greater than
+ 839 hours (~35 days)
+ */
+ MYSQL_TIME t;
+ t.neg= 0;
+
/* fill in query_time field */
- if (table->field[2]->store(query_time, TRUE))
+ calc_time_from_sec(&t, (long) min(query_time, (longlong) TIME_MAX_VALUE_SECONDS), 0);
+ if (table->field[2]->store_time(&t, MYSQL_TIMESTAMP_TIME))
goto err;
/* lock_time */
- if (table->field[3]->store(lock_time, TRUE))
+ calc_time_from_sec(&t, (long) min(lock_time, (longlong) TIME_MAX_VALUE_SECONDS), 0);
+ if (table->field[3]->store_time(&t, MYSQL_TIMESTAMP_TIME))
goto err;
/* rows_sent */
if (table->field[4]->store((longlong) thd->sent_row_count, TRUE))
@@ -869,9 +910,9 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length,
bool is_command= FALSE;
char user_host_buff[MAX_USER_HOST_SIZE];
- my_time_t current_time;
+ time_t current_time;
Security_context *sctx= thd->security_ctx;
- uint message_buff_len= 0, user_host_len= 0;
+ uint user_host_len= 0;
longlong query_time= 0, lock_time= 0;
/*
@@ -1124,7 +1165,7 @@ void LOGGER::deactivate_log_handler(THD *thd, uint log_type)
log_thd= table_log_handler->general_log_thd;
break;
default:
- DBUG_ASSERT(0);
+ assert(0); // Impossible
}
if (!(*tmp_opt))
@@ -1273,7 +1314,7 @@ void Log_to_csv_event_handler::
table= &slow_log;
break;
default:
- DBUG_ASSERT(0);
+ assert(0); // Impossible
}
/*
@@ -1394,7 +1435,7 @@ static int binlog_close_connection(handlerton *hton, THD *thd)
DBUG_ASSERT(mysql_bin_log.is_open() && trx_data->empty());
thd->ha_data[binlog_hton->slot]= 0;
trx_data->~binlog_trx_data();
- my_free((gptr)trx_data, MYF(0));
+ my_free((uchar*)trx_data, MYF(0));
return 0;
}
@@ -1483,12 +1524,11 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data,
If rolling back a statement in a transaction, we truncate the
transaction cache to remove the statement.
-
*/
if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
trx_data->reset();
- else
- trx_data->truncate(trx_data->before_stmt_pos); // ...statement
+ else // ...statement
+ trx_data->truncate(trx_data->before_stmt_pos);
/*
We need to step the table map version on a rollback to ensure
@@ -1507,23 +1547,21 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all)
do nothing.
just pretend we can do 2pc, so that MySQL won't
switch to 1pc.
- real work will be done in MYSQL_BIN_LOG::log()
+ real work will be done in MYSQL_BIN_LOG::log_xid()
*/
return 0;
}
static int binlog_commit(handlerton *hton, THD *thd, bool all)
{
- int error= 0;
DBUG_ENTER("binlog_commit");
binlog_trx_data *const trx_data=
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
- IO_CACHE *trans_log= &trx_data->trans_log;
DBUG_ASSERT(mysql_bin_log.is_open());
- if (all && trx_data->empty())
+ if (trx_data->empty())
{
- // we're here because trans_log was flushed in MYSQL_BIN_LOG::log()
+ // we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid()
trx_data->reset();
DBUG_RETURN(0);
}
@@ -1547,7 +1585,6 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
int error=0;
binlog_trx_data *const trx_data=
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
- IO_CACHE *trans_log= &trx_data->trans_log;
DBUG_ASSERT(mysql_bin_log.is_open());
if (trx_data->empty()) {
@@ -1561,8 +1598,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
table. Such cases should be rare (updating a
non-transactional table inside a transaction...)
*/
- if (unlikely(thd->options & (OPTION_STATUS_NO_TRANS_UPDATE |
- OPTION_KEEP_LOG)))
+ if (unlikely(thd->no_trans_update.all || (thd->options & OPTION_KEEP_LOG)))
{
Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, FALSE);
qev.error_code= 0; // see comment in MYSQL_LOG::write(THD, IO_CACHE)
@@ -1610,9 +1646,6 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
{
DBUG_ENTER("binlog_savepoint_rollback");
- binlog_trx_data *const trx_data=
- (binlog_trx_data*) thd->ha_data[binlog_hton->slot];
- IO_CACHE *trans_log= &trx_data->trans_log;
DBUG_ASSERT(mysql_bin_log.is_open());
/*
@@ -1620,10 +1653,9 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
non-transactional table. Otherwise, truncate the binlog cache starting
from the SAVEPOINT command.
*/
- if (unlikely(thd->options &
- (OPTION_STATUS_NO_TRANS_UPDATE | OPTION_KEEP_LOG)))
+ if (unlikely(thd->no_trans_update.all || (thd->options & OPTION_KEEP_LOG)))
{
- int const error=
+ int error=
thd->binlog_query(THD::STMT_QUERY_TYPE,
thd->query, thd->query_length, TRUE, FALSE);
DBUG_RETURN(error);
@@ -1632,12 +1664,13 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
DBUG_RETURN(0);
}
+
int check_binlog_magic(IO_CACHE* log, const char** errmsg)
{
char magic[4];
DBUG_ASSERT(my_b_tell(log) == 0);
- if (my_b_read(log, (byte*) magic, sizeof(magic)))
+ if (my_b_read(log, (uchar*) magic, sizeof(magic)))
{
*errmsg = "I/O error reading the header from the binary log";
sql_print_error("%s, errno=%d, io cache code=%d", *errmsg, my_errno,
@@ -1652,6 +1685,7 @@ int check_binlog_magic(IO_CACHE* log, const char** errmsg)
return 0;
}
+
File open_binlog(IO_CACHE *log, const char *log_file_name, const char **errmsg)
{
File file;
@@ -1689,7 +1723,7 @@ err:
#ifdef __NT__
static int eventSource = 0;
-void setup_windows_event_source()
+static void setup_windows_event_source()
{
HKEY hRegKey= NULL;
DWORD dwError= 0;
@@ -1710,7 +1744,7 @@ void setup_windows_event_source()
/* Register EventMessageFile */
dwError = RegSetValueEx(hRegKey, "EventMessageFile", 0, REG_EXPAND_SZ,
- (PBYTE) szPath, strlen(szPath)+1);
+ (PBYTE) szPath, (DWORD) (strlen(szPath) + 1));
/* Register supported event types */
dwTypes= (EVENTLOG_ERROR_TYPE | EVENTLOG_WARNING_TYPE |
@@ -1738,15 +1772,16 @@ static int find_uniq_filename(char *name)
struct st_my_dir *dir_info;
reg1 struct fileinfo *file_info;
ulong max_found=0;
-
+ size_t buf_length, length;
+ char *start, *end;
DBUG_ENTER("find_uniq_filename");
- uint length = dirname_part(buff,name);
- char *start = name + length;
- char *end = strend(start);
+ length= dirname_part(buff, name, &buf_length);
+ start= name + length;
+ end= strend(start);
*end='.';
- length= (uint) (end-start+1);
+ length= (size_t) (end-start+1);
if (!(dir_info = my_dir(buff,MYF(MY_DONT_SORT))))
{ // This shouldn't happen
@@ -1756,7 +1791,7 @@ static int find_uniq_filename(char *name)
file_info= dir_info->dir_entry;
for (i=dir_info->number_off_files ; i-- ; file_info++)
{
- if (bcmp(file_info->name,start,length) == 0 &&
+ if (bcmp((uchar*) file_info->name, (uchar*) start, length) == 0 &&
test_if_number(file_info->name+length, &number,0))
{
set_if_bigger(max_found,(ulong) number);
@@ -1860,7 +1895,7 @@ bool MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
);
end= strnmov(buff + len, "Time Id Command Argument\n",
sizeof(buff) - len);
- if (my_b_write(&log_file, (byte*) buff, (uint) (end-buff)) ||
+ if (my_b_write(&log_file, (uchar*) buff, (uint) (end-buff)) ||
flush_io_cache(&log_file))
goto err;
}
@@ -1882,8 +1917,8 @@ shutdown the MySQL server and restart it.", name, errno);
}
MYSQL_LOG::MYSQL_LOG()
- : name(0), log_type(LOG_UNKNOWN), log_state(LOG_CLOSED), write_error(FALSE),
- inited(FALSE)
+ : name(0), write_error(FALSE), inited(FALSE), log_type(LOG_UNKNOWN),
+ log_state(LOG_CLOSED)
{
/*
We don't want to initialize LOCK_Log here as such initialization depends on
@@ -2068,30 +2103,30 @@ bool MYSQL_QUERY_LOG::write(time_t event_time, const char *user_host,
start.tm_mday, start.tm_hour,
start.tm_min, start.tm_sec);
- if (my_b_write(&log_file, (byte*) &time_buff, time_buff_len))
+ if (my_b_write(&log_file, (uchar*) &time_buff, time_buff_len))
goto err;
}
else
- if (my_b_write(&log_file, (byte*) "\t\t" ,2) < 0)
+ if (my_b_write(&log_file, (uchar*) "\t\t" ,2) < 0)
goto err;
- /* command_type, thread_id */
+ /* command_type, thread_id */
length= my_snprintf(buff, 32, "%5ld ", (long) thread_id);
- if (my_b_write(&log_file, (byte*) buff, length))
+ if (my_b_write(&log_file, (uchar*) buff, length))
goto err;
- if (my_b_write(&log_file, (byte*) command_type, command_type_len))
+ if (my_b_write(&log_file, (uchar*) command_type, command_type_len))
goto err;
- if (my_b_write(&log_file, (byte*) "\t", 1))
+ if (my_b_write(&log_file, (uchar*) "\t", 1))
goto err;
/* sql_text */
- if (my_b_write(&log_file, (byte*) sql_text, sql_text_len))
+ if (my_b_write(&log_file, (uchar*) sql_text, sql_text_len))
goto err;
- if (my_b_write(&log_file, (byte*) "\n", 1) ||
+ if (my_b_write(&log_file, (uchar*) "\n", 1) ||
flush_io_cache(&log_file))
goto err;
}
@@ -2158,7 +2193,6 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
if (!(specialflag & SPECIAL_SHORT_LOG_FORMAT))
{
- Security_context *sctx= thd->security_ctx;
if (current_time != last_time)
{
last_time= current_time;
@@ -2172,7 +2206,7 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
start.tm_min, start.tm_sec);
/* Note that my_b_write() assumes it knows the length for this */
- if (my_b_write(&log_file, (byte*) buff, buff_len))
+ if (my_b_write(&log_file, (uchar*) buff, buff_len))
tmp_errno= errno;
}
if (my_b_printf(&log_file, "# User@Host: ", sizeof("# User@Host: ") - 1)
@@ -2180,7 +2214,7 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
tmp_errno= errno;
if (my_b_printf(&log_file, user_host, user_host_len) != user_host_len)
tmp_errno= errno;
- if (my_b_write(&log_file, (byte*) "\n", 1))
+ if (my_b_write(&log_file, (uchar*) "\n", 1))
tmp_errno= errno;
}
/* For slow query log */
@@ -2228,18 +2262,18 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
{
*end++=';';
*end='\n';
- if (my_b_write(&log_file, (byte*) "SET ", 4) ||
- my_b_write(&log_file, (byte*) buff + 1, (uint) (end-buff)))
+ if (my_b_write(&log_file, (uchar*) "SET ", 4) ||
+ my_b_write(&log_file, (uchar*) buff + 1, (uint) (end-buff)))
tmp_errno= errno;
}
if (is_command)
{
end= strxmov(buff, "# administrator command: ", NullS);
buff_len= (ulong) (end - buff);
- my_b_write(&log_file, (byte*) buff, buff_len);
+ my_b_write(&log_file, (uchar*) buff, buff_len);
}
- if (my_b_write(&log_file, (byte*) sql_text, sql_text_len) ||
- my_b_write(&log_file, (byte*) ";\n",2) ||
+ if (my_b_write(&log_file, (uchar*) sql_text, sql_text_len) ||
+ my_b_write(&log_file, (uchar*) ";\n",2) ||
flush_io_cache(&log_file))
tmp_errno= errno;
if (tmp_errno)
@@ -2266,7 +2300,7 @@ const char *MYSQL_LOG::generate_name(const char *log_name,
TODO: The following should be using fn_format(); We just need to
first change fn_format() to cut the file name if it's too long.
*/
- strmake(buff, glob_hostname, FN_REFLEN - 5);
+ strmake(buff, pidfile_name, FN_REFLEN - 5);
strmov(fn_ext(buff), suffix);
return (const char *)buff;
}
@@ -2397,7 +2431,6 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
bool null_created_arg)
{
File file= -1;
- int open_flags = O_CREAT | O_BINARY;
DBUG_ENTER("MYSQL_BIN_LOG::open");
DBUG_PRINT("enter",("log_type: %d",(int) log_type_arg));
@@ -2424,7 +2457,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
an extension for the binary log files.
In this case we write a standard header to it.
*/
- if (my_b_safe_write(&log_file, (byte*) BINLOG_MAGIC,
+ if (my_b_safe_write(&log_file, (uchar*) BINLOG_MAGIC,
BIN_LOG_HEADER_SIZE))
goto err;
bytes_written+= BIN_LOG_HEADER_SIZE;
@@ -2476,7 +2509,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
/*
Set 'created' to 0, so that in next relay logs this event does not
trigger cleaning actions on the slave in
- Format_description_log_event::exec_event().
+ Format_description_log_event::apply_event_impl().
*/
description_event_for_queue->created= 0;
/* Don't set log_pos in event header */
@@ -2496,9 +2529,9 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
As this is a new log file, we write the file name to the index
file. As every time we write to the index file, we sync it.
*/
- if (my_b_write(&index_file, (byte*) log_file_name,
+ if (my_b_write(&index_file, (uchar*) log_file_name,
strlen(log_file_name)) ||
- my_b_write(&index_file, (byte*) "\n", 1) ||
+ my_b_write(&index_file, (uchar*) "\n", 1) ||
flush_io_cache(&index_file) ||
my_sync(index_file.file, MYF(MY_WME)))
goto err;
@@ -2559,12 +2592,14 @@ int MYSQL_BIN_LOG::raw_get_current_log(LOG_INFO* linfo)
0 ok
*/
+#ifdef HAVE_REPLICATION
+
static bool copy_up_file_and_fill(IO_CACHE *index_file, my_off_t offset)
{
int bytes_read;
my_off_t init_offset= offset;
File file= index_file->file;
- byte io_buf[IO_SIZE*2];
+ uchar io_buf[IO_SIZE*2];
DBUG_ENTER("copy_up_file_and_fill");
for (;; offset+= bytes_read)
@@ -2576,7 +2611,7 @@ static bool copy_up_file_and_fill(IO_CACHE *index_file, my_off_t offset)
if (!bytes_read)
break; // end of file
(void) my_seek(file, offset-init_offset, MY_SEEK_SET, MYF(0));
- if (my_write(file, (byte*) io_buf, bytes_read, MYF(MY_WME | MY_NABP)))
+ if (my_write(file, io_buf, bytes_read, MYF(MY_WME | MY_NABP)))
goto err;
}
/* The following will either truncate the file or fill the end with \n' */
@@ -2592,6 +2627,7 @@ err:
DBUG_RETURN(1);
}
+#endif /* HAVE_REPLICATION */
/*
Find the position in the log-index-file for the given log name
@@ -2787,7 +2823,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd)
need_start_event=1;
if (!open_index_file(index_file_name, 0))
open(save_name, log_type, 0, io_cache_type, no_auto_events, max_size, 0);
- my_free((gptr) save_name, MYF(0));
+ my_free((uchar*) save_name, MYF(0));
err:
VOID(pthread_mutex_unlock(&LOCK_thread_count));
@@ -3084,8 +3120,6 @@ err:
pthread_mutex_unlock(&LOCK_index);
DBUG_RETURN(error);
}
-
-
#endif /* HAVE_REPLICATION */
@@ -3182,8 +3216,10 @@ void MYSQL_BIN_LOG::new_file_impl(bool need_lock)
{
tc_log_page_waits++;
pthread_mutex_lock(&LOCK_prep_xids);
- while (prepared_xids)
+ while (prepared_xids) {
+ DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
pthread_cond_wait(&COND_prep_xids, &LOCK_prep_xids);
+ }
pthread_mutex_unlock(&LOCK_prep_xids);
}
@@ -3207,7 +3243,6 @@ void MYSQL_BIN_LOG::new_file_impl(bool need_lock)
We log the whole file name for log file as the user may decide
to change base names at some point.
*/
- THD *thd = current_thd; /* may be 0 if we are reacting to SIGHUP */
Rotate_log_event r(new_name+dirname_length(new_name),
0, LOG_EVENT_OFFSET, 0);
r.write(&log_file);
@@ -3290,7 +3325,7 @@ bool MYSQL_BIN_LOG::appendv(const char* buf, uint len,...)
safe_mutex_assert_owner(&LOCK_log);
do
{
- if (my_b_append(&log_file,(byte*) buf,len))
+ if (my_b_append(&log_file,(uchar*) buf,len))
{
error= 1;
goto err;
@@ -3322,13 +3357,13 @@ bool MYSQL_BIN_LOG::flush_and_sync()
return err;
}
-void MYSQL_BIN_LOG::start_union_events(THD *thd)
+void MYSQL_BIN_LOG::start_union_events(THD *thd, query_id_t query_id_param)
{
DBUG_ASSERT(!thd->binlog_evt_union.do_union);
thd->binlog_evt_union.do_union= TRUE;
thd->binlog_evt_union.unioned_events= FALSE;
thd->binlog_evt_union.unioned_events_trans= FALSE;
- thd->binlog_evt_union.first_query_id= thd->query_id;
+ thd->binlog_evt_union.first_query_id= query_id_param;
}
void MYSQL_BIN_LOG::stop_union_events(THD *thd)
@@ -3364,7 +3399,7 @@ int THD::binlog_setup_trx_data()
open_cached_file(&trx_data->trans_log, mysql_tmpdir,
LOG_PREFIX, binlog_cache_size, MYF(MY_WME)))
{
- my_free((gptr)trx_data, MYF(MY_ALLOW_ZERO_PTR));
+ my_free((uchar*)trx_data, MYF(MY_ALLOW_ZERO_PTR));
ha_data[binlog_hton->slot]= 0;
DBUG_RETURN(1); // Didn't manage to set it up
}
@@ -3415,18 +3450,7 @@ THD::binlog_start_trans_and_stmt()
if (trx_data == NULL ||
trx_data->before_stmt_pos == MY_OFF_T_UNDEF)
{
- /*
- The call to binlog_trans_log_savepos() might create the trx_data
- structure, if it didn't exist before, so we save the position
- into an auto variable and then write it into the transaction
- data for the binary log (i.e., trx_data).
- */
- my_off_t pos= 0;
- binlog_trans_log_savepos(this, &pos);
- trx_data= (binlog_trx_data*) ha_data[binlog_hton->slot];
-
- trx_data->before_stmt_pos= pos;
-
+ this->binlog_set_stmt_begin();
if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trans_register_ha(this, TRUE, binlog_hton);
trans_register_ha(this, FALSE, binlog_hton);
@@ -3434,6 +3458,51 @@ THD::binlog_start_trans_and_stmt()
DBUG_VOID_RETURN;
}
+void THD::binlog_set_stmt_begin() {
+ binlog_trx_data *trx_data=
+ (binlog_trx_data*) ha_data[binlog_hton->slot];
+
+ /*
+ The call to binlog_trans_log_savepos() might create the trx_data
+ structure, if it didn't exist before, so we save the position
+ into an auto variable and then write it into the transaction
+ data for the binary log (i.e., trx_data).
+ */
+ my_off_t pos= 0;
+ binlog_trans_log_savepos(this, &pos);
+ trx_data= (binlog_trx_data*) ha_data[binlog_hton->slot];
+ trx_data->before_stmt_pos= pos;
+}
+
+int THD::binlog_flush_transaction_cache()
+{
+ DBUG_ENTER("binlog_flush_transaction_cache");
+ binlog_trx_data *trx_data= (binlog_trx_data*) ha_data[binlog_hton->slot];
+ DBUG_PRINT("enter", ("trx_data=0x%lu", (ulong) trx_data));
+ if (trx_data)
+ DBUG_PRINT("enter", ("trx_data->before_stmt_pos=%lu",
+ (ulong) trx_data->before_stmt_pos));
+
+ /*
+ Write the transaction cache to the binary log. We don't flush and
+ sync the log file since we don't know if more will be written to
+ it. If the caller want the log file sync:ed, the caller has to do
+ it.
+
+ The transaction data is only reset upon a successful write of the
+ cache to the binary log.
+ */
+
+ if (trx_data && likely(mysql_bin_log.is_open())) {
+ if (int error= mysql_bin_log.write_cache(&trx_data->trans_log, true, true))
+ DBUG_RETURN(error);
+ trx_data->reset();
+ }
+
+ DBUG_RETURN(0);
+}
+
+
/*
Write a table map to the binary log.
*/
@@ -3627,14 +3696,14 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
*/
if (likely(is_open()))
{
- const char *local_db= event_info->get_db();
IO_CACHE *file= &log_file;
#ifdef HAVE_REPLICATION
/*
- In the future we need to add to the following if tests like
- "do the involved tables match (to be implemented)
- binlog_[wild_]{do|ignore}_table?" (WL#1049)"
+ In the future we need to add to the following if tests like
+ "do the involved tables match (to be implemented)
+ binlog_[wild_]{do|ignore}_table?" (WL#1049)"
*/
+ const char *local_db= event_info->get_db();
if ((thd && !(thd->options & OPTION_BIN_LOG)) ||
(!binlog_filter->db_ok(local_db)))
{
@@ -3716,7 +3785,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
nb_elements()));
/*
If the auto_increment was second in a table's index (possible with
- MyISAM or BDB) (table->next_number_key_offset != 0), such event is
+ MyISAM or BDB) (table->next_number_keypart != 0), such event is
in fact not necessary. We could avoid logging it.
*/
Intvar_log_event e(thd, (uchar) INSERT_ID_EVENT,
@@ -3736,7 +3805,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
for (uint i= 0; i < thd->user_var_events.elements; i++)
{
BINLOG_USER_VAR_EVENT *user_var_event;
- get_dynamic(&thd->user_var_events,(gptr) &user_var_event, i);
+ get_dynamic(&thd->user_var_events,(uchar*) &user_var_event, i);
User_var_log_event e(thd, user_var_event->user_var_event->name.str,
user_var_event->user_var_event->name.length,
user_var_event->value,
@@ -3823,7 +3892,7 @@ void MYSQL_BIN_LOG::rotate_and_purge(uint flags)
#ifdef HAVE_REPLICATION
if (expire_logs_days)
{
- long purge_time= time(0) - expire_logs_days*24*60*60;
+ time_t purge_time= time(0) - expire_logs_days*24*60*60;
if (purge_time >= 0)
purge_logs_before_date(purge_time);
}
@@ -3844,12 +3913,48 @@ uint MYSQL_BIN_LOG::next_file_id()
/*
+ Write the contents of a cache to the binary log.
+
+ SYNOPSIS
+ write_cache()
+ cache Cache to write to the binary log
+ lock_log True if the LOCK_log mutex should be aquired, false otherwise
+ sync_log True if the log should be flushed and sync:ed
+
+ DESCRIPTION
+ Write the contents of the cache to the binary log. The cache will
+ be reset as a READ_CACHE to be able to read the contents from it.
+ */
+
+int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log)
+{
+ Mutex_sentry sentry(lock_log ? &LOCK_log : NULL);
+
+ if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
+ return ER_ERROR_ON_WRITE;
+ uint bytes= my_b_bytes_in_cache(cache);
+ do
+ {
+ if (my_b_write(&log_file, cache->read_pos, bytes))
+ return ER_ERROR_ON_WRITE;
+ cache->read_pos= cache->read_end;
+ } while ((bytes= my_b_fill(cache)));
+
+ if (sync_log)
+ flush_and_sync();
+
+ return 0; // All OK
+}
+
+/*
Write a cached log entry to the binary log
SYNOPSIS
write()
thd
cache The cache to copy to the binlog
+ commit_event The commit event to print after writing the
+ contents of the cache.
NOTE
- We only come here if there is something in the cache.
@@ -3874,8 +3979,6 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event)
if (likely(is_open())) // Should always be true
{
- uint length;
-
/*
We only bother to write to the binary log if there is anything
to write.
@@ -3909,25 +4012,12 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event)
if (qinfo.write(&log_file))
goto err;
}
- /* Read from the file used to cache the queries .*/
- if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
- goto err;
- length=my_b_bytes_in_cache(cache);
- DBUG_EXECUTE_IF("half_binlogged_transaction", length-=100;);
- do
- {
- /* Write data to the binary log file */
- if (my_b_write(&log_file, cache->read_pos, length))
- goto err;
- cache->read_pos=cache->read_end; // Mark buffer used up
- DBUG_EXECUTE_IF("half_binlogged_transaction", goto DBUG_skip_commit;);
- } while ((length=my_b_fill(cache)));
+ if ((write_error= write_cache(cache, false, false)))
+ goto err;
+
if (commit_event && commit_event->write(&log_file))
goto err;
-#ifndef DBUG_OFF
- DBUG_skip_commit:
-#endif
if (flush_and_sync())
goto err;
DBUG_EXECUTE_IF("half_binlogged_transaction", abort(););
@@ -4042,8 +4132,16 @@ void MYSQL_BIN_LOG::close(uint exiting)
if (log_file.type == WRITE_CACHE && log_type == LOG_BIN)
{
my_off_t offset= BIN_LOG_HEADER_SIZE + FLAGS_OFFSET;
- byte flags=0; // clearing LOG_EVENT_BINLOG_IN_USE_F
+ my_off_t org_position= my_tell(log_file.file, MYF(0));
+ uchar flags= 0; // clearing LOG_EVENT_BINLOG_IN_USE_F
my_pwrite(log_file.file, &flags, 1, offset, MYF(0));
+ /*
+ Restore position so that anything we have in the IO_cache is written
+ to the correct position.
+ We need the seek here, as my_pwrite() is not guaranteed to keep the
+ original position on system that doesn't support pwrite().
+ */
+ my_seek(log_file.file, org_position, MY_SEEK_SET, MYF(0));
}
/* this will cleanup IO_CACHE, sync and close the file */
@@ -4138,38 +4236,6 @@ static bool test_if_number(register const char *str,
} /* test_if_number */
-void print_buffer_to_file(enum loglevel level, const char *buffer)
-{
- time_t skr;
- struct tm tm_tmp;
- struct tm *start;
- DBUG_ENTER("print_buffer_to_file");
- DBUG_PRINT("enter",("buffer: %s", buffer));
-
- VOID(pthread_mutex_lock(&LOCK_error_log));
-
- skr=time(NULL);
- localtime_r(&skr, &tm_tmp);
- start=&tm_tmp;
-
- fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d [%s] %s\n",
- start->tm_year % 100,
- start->tm_mon+1,
- start->tm_mday,
- start->tm_hour,
- start->tm_min,
- start->tm_sec,
- (level == ERROR_LEVEL ? "ERROR" : level == WARNING_LEVEL ?
- "Warning" : "Note"),
- buffer);
-
- fflush(stderr);
-
- VOID(pthread_mutex_unlock(&LOCK_error_log));
- DBUG_VOID_RETURN;
-}
-
-
void sql_perror(const char *message)
{
#ifdef HAVE_STRERROR
@@ -4199,17 +4265,21 @@ bool flush_error_log()
(void) my_delete(err_temp, MYF(0));
if (freopen(err_temp,"a+",stdout))
{
+ int fd;
+ size_t bytes;
+ uchar buf[IO_SIZE];
+
freopen(err_temp,"a+",stderr);
(void) my_delete(err_renamed, MYF(0));
my_rename(log_error_file,err_renamed,MYF(0));
if (freopen(log_error_file,"a+",stdout))
freopen(log_error_file,"a+",stderr);
- int fd, bytes;
- char buf[IO_SIZE];
+
if ((fd = my_open(err_temp, O_RDONLY, MYF(0))) >= 0)
{
- while ((bytes = (int) my_read(fd, (byte*) buf, IO_SIZE, MYF(0))) > 0)
- my_fwrite(stderr, (byte*) buf, bytes, MYF(0));
+ while ((bytes= my_read(fd, buf, IO_SIZE, MYF(0))) &&
+ bytes != MY_FILE_ERROR)
+ my_fwrite(stderr, buf, bytes, MYF(0));
my_close(fd, MYF(0));
}
(void) my_delete(err_temp, MYF(0));
@@ -4236,23 +4306,15 @@ void MYSQL_BIN_LOG::signal_update()
}
#ifdef __NT__
-void print_buffer_to_nt_eventlog(enum loglevel level, char *buff,
- uint length, int buffLen)
+static void print_buffer_to_nt_eventlog(enum loglevel level, char *buff,
+ size_t length, size_t buffLen)
{
HANDLE event;
- char *buffptr;
- LPCSTR *buffmsgptr;
+ char *buffptr= buff;
DBUG_ENTER("print_buffer_to_nt_eventlog");
- buffptr= buff;
- if (length > (uint)(buffLen-5))
- {
- char *newBuff= new char[length + 5];
- strcpy(newBuff, buff);
- buffptr= newBuff;
- }
- strmov(buffptr+length, "\r\n\r\n");
- buffmsgptr= (LPCSTR*) &buffptr; // Keep windows happy
+ /* Add ending CR/LF's to string, overwrite last chars if necessary */
+ strmov(buffptr+min(length, buffLen-5), "\r\n\r\n");
setup_windows_event_source();
if ((event= RegisterEventSource(NULL,"MySQL")))
@@ -4260,24 +4322,20 @@ void print_buffer_to_nt_eventlog(enum loglevel level, char *buff,
switch (level) {
case ERROR_LEVEL:
ReportEvent(event, EVENTLOG_ERROR_TYPE, 0, MSG_DEFAULT, NULL, 1, 0,
- buffmsgptr, NULL);
+ (LPCSTR*)&buffptr, NULL);
break;
case WARNING_LEVEL:
ReportEvent(event, EVENTLOG_WARNING_TYPE, 0, MSG_DEFAULT, NULL, 1, 0,
- buffmsgptr, NULL);
+ (LPCSTR*) &buffptr, NULL);
break;
case INFORMATION_LEVEL:
ReportEvent(event, EVENTLOG_INFORMATION_TYPE, 0, MSG_DEFAULT, NULL, 1,
- 0, buffmsgptr, NULL);
+ 0, (LPCSTR*) &buffptr, NULL);
break;
}
DeregisterEventSource(event);
}
- /* if we created a string buffer, then delete it */
- if (buffptr != buff)
- delete[] buffptr;
-
DBUG_VOID_RETURN;
}
#endif /* __NT__ */
@@ -4314,14 +4372,45 @@ int vprint_msg_to_log(enum loglevel level __attribute__((unused)),
DBUG_RETURN(0);
}
#else /*!EMBEDDED_LIBRARY*/
+static void print_buffer_to_file(enum loglevel level, const char *buffer)
+{
+ time_t skr;
+ struct tm tm_tmp;
+ struct tm *start;
+ DBUG_ENTER("print_buffer_to_file");
+ DBUG_PRINT("enter",("buffer: %s", buffer));
+
+ VOID(pthread_mutex_lock(&LOCK_error_log));
+
+ skr=time(NULL);
+ localtime_r(&skr, &tm_tmp);
+ start=&tm_tmp;
+
+ fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d [%s] %s\n",
+ start->tm_year % 100,
+ start->tm_mon+1,
+ start->tm_mday,
+ start->tm_hour,
+ start->tm_min,
+ start->tm_sec,
+ (level == ERROR_LEVEL ? "ERROR" : level == WARNING_LEVEL ?
+ "Warning" : "Note"),
+ buffer);
+
+ fflush(stderr);
+
+ VOID(pthread_mutex_unlock(&LOCK_error_log));
+ DBUG_VOID_RETURN;
+}
+
+
int vprint_msg_to_log(enum loglevel level, const char *format, va_list args)
{
char buff[1024];
- uint length;
+ size_t length;
DBUG_ENTER("vprint_msg_to_log");
- /* "- 5" is because of print_buffer_to_nt_eventlog() */
- length= my_vsnprintf(buff, sizeof(buff) - 5, format, args);
+ length= my_vsnprintf(buff, sizeof(buff), format, args);
print_buffer_to_file(level, buff);
#ifdef __NT__
@@ -4440,7 +4529,7 @@ int TC_LOG_MMAP::open(const char *opt_name)
goto err;
if (using_heuristic_recover())
return 1;
- if ((fd= my_create(logname, O_RDWR, 0, MYF(MY_WME))) < 0)
+ if ((fd= my_create(logname, CREATE_MODE, O_RDWR, MYF(MY_WME))) < 0)
goto err;
inited=1;
file_length= opt_tc_log_size;
@@ -4587,21 +4676,34 @@ int TC_LOG_MMAP::overflow()
}
/*
- all access to active page is serialized but it's not a problem, as
- we're assuming that fsync() will be a main bottleneck.
- That is, parallelizing writes to log pages we'll decrease number of
- threads waiting for a page, but then all these threads will be waiting
- for a fsync() anyway
+ Record that transaction XID is committed on the persistent storage
+
+ NOTES
+ This function is called in the middle of two-phase commit:
+ First all resources prepare the transaction, then tc_log->log() is called,
+ then all resources commit the transaction, then tc_log->unlog() is called.
+
+ All access to active page is serialized but it's not a problem, as
+ we're assuming that fsync() will be a main bottleneck.
+ That is, parallelizing writes to log pages we'll decrease number of
+ threads waiting for a page, but then all these threads will be waiting
+ for a fsync() anyway
+
+ IMPLEMENTATION
+ If tc_log == MYSQL_LOG then tc_log writes transaction to binlog and
+ records XID in a special Xid_log_event.
+ If tc_log = TC_LOG_MMAP then xid is written in a special memory-mapped
+ log.
RETURN
- 0 - error
- otherwise - "cookie", a number that will be passed as an argument
- to unlog() call. tc_log can define it any way it wants,
- and use for whatever purposes. TC_LOG_MMAP sets it
- to the position in memory where xid was logged to.
+ 0 Error
+ # "cookie", a number that will be passed as an argument
+ to unlog() call. tc_log can define it any way it wants,
+ and use for whatever purposes. TC_LOG_MMAP sets it
+ to the position in memory where xid was logged to.
*/
-int TC_LOG_MMAP::log(THD *thd, my_xid xid)
+int TC_LOG_MMAP::log_xid(THD *thd, my_xid xid)
{
int err;
PAGE *p;
@@ -4710,6 +4812,7 @@ int TC_LOG_MMAP::sync()
erase xid from the page, update page free space counters/pointers.
cookie points directly to the memory where xid was logged
*/
+
void TC_LOG_MMAP::unlog(ulong cookie, my_xid xid)
{
PAGE *p=pages+(cookie/tc_log_page_size);
@@ -4750,9 +4853,9 @@ void TC_LOG_MMAP::close()
pthread_cond_destroy(&pages[i].cond);
}
case 3:
- my_free((gptr)pages, MYF(0));
+ my_free((uchar*)pages, MYF(0));
case 2:
- my_munmap((byte*)data, (size_t)file_length);
+ my_munmap((char*)data, (size_t)file_length);
case 1:
my_close(fd, MYF(0));
}
@@ -4786,13 +4889,13 @@ int TC_LOG_MMAP::recover()
}
if (hash_init(&xids, &my_charset_bin, tc_log_page_size/3, 0,
- sizeof(my_xid), 0, 0, MYF(0)))
+ sizeof(my_xid), 0, 0, MYF(0)))
goto err1;
for ( ; p < end_p ; p++)
{
for (my_xid *x=p->start; x < p->end; x++)
- if (*x && my_hash_insert(&xids, (byte *)x))
+ if (*x && my_hash_insert(&xids, (uchar *)x))
goto err2; // OOM
}
@@ -4952,7 +5055,7 @@ void TC_LOG_BINLOG::close()
0 - error
1 - success
*/
-int TC_LOG_BINLOG::log(THD *thd, my_xid xid)
+int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid)
{
DBUG_ENTER("TC_LOG_BINLOG::log");
Xid_log_event xle(thd, xid);
@@ -4969,8 +5072,10 @@ void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
{
pthread_mutex_lock(&LOCK_prep_xids);
DBUG_ASSERT(prepared_xids > 0);
- if (--prepared_xids == 0)
+ if (--prepared_xids == 0) {
+ DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
pthread_cond_signal(&COND_prep_xids);
+ }
pthread_mutex_unlock(&LOCK_prep_xids);
rotate_and_purge(0); // as ::write() did not rotate
}
@@ -4983,7 +5088,7 @@ int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
if (! fdle->is_valid() ||
hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0,
- sizeof(my_xid), 0, 0, MYF(0)))
+ sizeof(my_xid), 0, 0, MYF(0)))
goto err1;
init_alloc_root(&mem_root, TC_LOG_PAGE_SIZE, TC_LOG_PAGE_SIZE);
@@ -4995,8 +5100,8 @@ int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
if (ev->get_type_code() == XID_EVENT)
{
Xid_log_event *xev=(Xid_log_event *)ev;
- byte *x=(byte *)memdup_root(&mem_root, (char *)& xev->xid,
- sizeof(xev->xid));
+ uchar *x= (uchar *) memdup_root(&mem_root, (uchar*) &xev->xid,
+ sizeof(xev->xid));
if (! x)
goto err2;
my_hash_insert(&xids, x);