summaryrefslogtreecommitdiff
path: root/sql/log.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log.cc')
-rw-r--r--sql/log.cc186
1 files changed, 129 insertions, 57 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 150c4a58c63..1b432ca15c0 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -81,23 +81,54 @@ char *make_default_log_name(char *buff,const char* log_ext)
}
/*
+ Helper class to hold a mutex for the duration of the
+ block.
+
+ Eliminates the need for explicit unlocking of mutexes on, e.g.,
+ error returns. On passing a null pointer, the sentry will not do
+ anything.
+ */
+class Mutex_sentry
+{
+public:
+ Mutex_sentry(pthread_mutex_t *mutex)
+ : m_mutex(mutex)
+ {
+ if (m_mutex)
+ pthread_mutex_lock(mutex);
+ }
+
+ ~Mutex_sentry()
+ {
+ if (m_mutex)
+ pthread_mutex_unlock(m_mutex);
+#ifndef DBUG_OFF
+ m_mutex= 0;
+#endif
+ }
+
+private:
+ pthread_mutex_t *m_mutex;
+
+ // It's not allowed to copy this object in any way
+ Mutex_sentry(Mutex_sentry const&);
+ void operator=(Mutex_sentry const&);
+};
+
+/*
Helper class to store binary log transaction data.
*/
class binlog_trx_data {
public:
binlog_trx_data()
-#ifdef HAVE_ROW_BASED_REPLICATION
: m_pending(0), before_stmt_pos(MY_OFF_T_UNDEF)
-#endif
{
trans_log.end_of_file= max_binlog_cache_size;
}
~binlog_trx_data()
{
-#ifdef HAVE_ROW_BASED_REPLICATION
DBUG_ASSERT(pending() == NULL);
-#endif
close_cached_file(&trans_log);
}
@@ -107,11 +138,7 @@ public:
bool empty() const
{
-#ifdef HAVE_ROW_BASED_REPLICATION
return pending() == NULL && my_b_tell(&trans_log) == 0;
-#else
- return my_b_tell(&trans_log) == 0;
-#endif
}
/*
@@ -120,11 +147,13 @@ public:
*/
void truncate(my_off_t pos)
{
-#ifdef HAVE_ROW_BASED_REPLICATION
+ DBUG_PRINT("info", ("truncating to position %lu", pos));
+ DBUG_PRINT("info", ("before_stmt_pos=%lu", pos));
delete pending();
set_pending(0);
-#endif
reinit_io_cache(&trans_log, WRITE_CACHE, pos, 0, 0);
+ if (pos < before_stmt_pos)
+ before_stmt_pos= MY_OFF_T_UNDEF;
}
/*
@@ -134,13 +163,10 @@ public:
void reset() {
if (!empty())
truncate(0);
-#ifdef HAVE_ROW_BASED_REPLICATION
before_stmt_pos= MY_OFF_T_UNDEF;
-#endif
trans_log.end_of_file= max_binlog_cache_size;
}
-#ifdef HAVE_ROW_BASED_REPLICATION
Rows_log_event *pending() const
{
return m_pending;
@@ -150,12 +176,10 @@ public:
{
m_pending= pending;
}
-#endif
IO_CACHE trans_log; // The transaction cache
private:
-#ifdef HAVE_ROW_BASED_REPLICATION
/*
Pending binrows event. This event is the event where the rows are
currently written.
@@ -167,7 +191,6 @@ public:
Binlog position before the start of the current statement.
*/
my_off_t before_stmt_pos;
-#endif
};
handlerton *binlog_hton;
@@ -1467,9 +1490,7 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data,
were, we would have to ensure that we're not ending a statement
inside a stored function.
*/
-#ifdef HAVE_ROW_BASED_REPLICATION
thd->binlog_flush_pending_rows_event(TRUE);
-#endif
/*
We write the transaction cache to the binary log if either we're
committing the entire transaction, or if we are doing an
@@ -1479,13 +1500,11 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data,
{
error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev);
trx_data->reset();
-#ifdef HAVE_ROW_BASED_REPLICATION
/*
We need to step the table map version after writing the
transaction cache to disk.
*/
mysql_bin_log.update_table_map_version();
-#endif
statistic_increment(binlog_cache_use, &LOCK_status);
if (trans_log->disk_writes != 0)
{
@@ -1494,7 +1513,6 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data,
}
}
}
-#ifdef HAVE_ROW_BASED_REPLICATION
else
{
/*
@@ -1503,12 +1521,11 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data,
If rolling back a statement in a transaction, we truncate the
transaction cache to remove the statement.
-
*/
if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
trx_data->reset();
- else
- trx_data->truncate(trx_data->before_stmt_pos); // ...statement
+ else // ...statement
+ trx_data->truncate(trx_data->before_stmt_pos);
/*
We need to step the table map version on a rollback to ensure
@@ -1517,7 +1534,6 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data,
*/
mysql_bin_log.update_table_map_version();
}
-#endif
DBUG_RETURN(error);
}
@@ -2096,7 +2112,7 @@ bool MYSQL_QUERY_LOG::write(time_t event_time, const char *user_host,
if (my_b_write(&log_file, (byte*) "\t\t" ,2) < 0)
goto err;
- /* command_type, thread_id */
+ /* command_type, thread_id */
length= my_snprintf(buff, 32, "%5ld ", (long) thread_id);
if (my_b_write(&log_file, (byte*) buff, length))
@@ -3395,7 +3411,6 @@ int THD::binlog_setup_trx_data()
DBUG_RETURN(0);
}
-#ifdef HAVE_ROW_BASED_REPLICATION
/*
Function to start a statement and optionally a transaction for the
binary log.
@@ -3437,18 +3452,7 @@ THD::binlog_start_trans_and_stmt()
if (trx_data == NULL ||
trx_data->before_stmt_pos == MY_OFF_T_UNDEF)
{
- /*
- The call to binlog_trans_log_savepos() might create the trx_data
- structure, if it didn't exist before, so we save the position
- into an auto variable and then write it into the transaction
- data for the binary log (i.e., trx_data).
- */
- my_off_t pos= 0;
- binlog_trans_log_savepos(this, &pos);
- trx_data= (binlog_trx_data*) ha_data[binlog_hton->slot];
-
- trx_data->before_stmt_pos= pos;
-
+ this->binlog_set_stmt_begin();
if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trans_register_ha(this, TRUE, binlog_hton);
trans_register_ha(this, FALSE, binlog_hton);
@@ -3456,6 +3460,51 @@ THD::binlog_start_trans_and_stmt()
DBUG_VOID_RETURN;
}
+void THD::binlog_set_stmt_begin() {
+ binlog_trx_data *trx_data=
+ (binlog_trx_data*) ha_data[binlog_hton->slot];
+
+ /*
+ The call to binlog_trans_log_savepos() might create the trx_data
+ structure, if it didn't exist before, so we save the position
+ into an auto variable and then write it into the transaction
+ data for the binary log (i.e., trx_data).
+ */
+ my_off_t pos= 0;
+ binlog_trans_log_savepos(this, &pos);
+ trx_data= (binlog_trx_data*) ha_data[binlog_hton->slot];
+ trx_data->before_stmt_pos= pos;
+}
+
+int THD::binlog_flush_transaction_cache()
+{
+ DBUG_ENTER("binlog_flush_transaction_cache");
+ binlog_trx_data *trx_data= (binlog_trx_data*) ha_data[binlog_hton->slot];
+ DBUG_PRINT("enter", ("trx_data=0x%lu", trx_data));
+ if (trx_data)
+ DBUG_PRINT("enter", ("trx_data->before_stmt_pos=%u",
+ trx_data->before_stmt_pos));
+
+ /*
+ Write the transaction cache to the binary log. We don't flush and
+ sync the log file since we don't know if more will be written to
+ it. If the caller want the log file sync:ed, the caller has to do
+ it.
+
+ The transaction data is only reset upon a successful write of the
+ cache to the binary log.
+ */
+
+ if (trx_data && likely(mysql_bin_log.is_open())) {
+ if (int error= mysql_bin_log.write_cache(&trx_data->trans_log, true, true))
+ DBUG_RETURN(error);
+ trx_data->reset();
+ }
+
+ DBUG_RETURN(0);
+}
+
+
/*
Write a table map to the binary log.
*/
@@ -3604,7 +3653,6 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
DBUG_RETURN(error);
}
-#endif /*HAVE_ROW_BASED_REPLICATION*/
/*
Write an event to the binary log
@@ -3637,11 +3685,9 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
we are inside a stored function, we do not end the statement since
this will close all tables on the slave.
*/
-#ifdef HAVE_ROW_BASED_REPLICATION
bool const end_stmt=
thd->prelocked_mode && thd->lex->requires_prelocking();
thd->binlog_flush_pending_rows_event(end_stmt);
-#endif /*HAVE_ROW_BASED_REPLICATION*/
pthread_mutex_lock(&LOCK_log);
@@ -3671,7 +3717,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
}
#endif /* HAVE_REPLICATION */
-#if defined(USING_TRANSACTIONS) && defined(HAVE_ROW_BASED_REPLICATION)
+#if defined(USING_TRANSACTIONS)
/*
Should we write to the binlog cache or to the binlog on disk?
Write to the binlog cache if:
@@ -3706,7 +3752,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
LOCK_log.
*/
}
-#endif /* USING_TRANSACTIONS && HAVE_ROW_BASED_REPLICATION */
+#endif /* USING_TRANSACTIONS */
DBUG_PRINT("info",("event type: %d",event_info->get_type_code()));
/*
@@ -3869,12 +3915,48 @@ uint MYSQL_BIN_LOG::next_file_id()
/*
+ Write the contents of a cache to the binary log.
+
+ SYNOPSIS
+ write_cache()
+ cache Cache to write to the binary log
+ lock_log True if the LOCK_log mutex should be aquired, false otherwise
+ sync_log True if the log should be flushed and sync:ed
+
+ DESCRIPTION
+ Write the contents of the cache to the binary log. The cache will
+ be reset as a READ_CACHE to be able to read the contents from it.
+ */
+
+int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log)
+{
+ Mutex_sentry sentry(lock_log ? &LOCK_log : NULL);
+
+ if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
+ return ER_ERROR_ON_WRITE;
+ uint bytes= my_b_bytes_in_cache(cache);
+ do
+ {
+ if (my_b_write(&log_file, cache->read_pos, bytes))
+ return ER_ERROR_ON_WRITE;
+ cache->read_pos= cache->read_end;
+ } while ((bytes= my_b_fill(cache)));
+
+ if (sync_log)
+ flush_and_sync();
+
+ return 0; // All OK
+}
+
+/*
Write a cached log entry to the binary log
SYNOPSIS
write()
thd
cache The cache to copy to the binlog
+ commit_event The commit event to print after writing the
+ contents of the cache.
NOTE
- We only come here if there is something in the cache.
@@ -3934,20 +4016,10 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event)
if (qinfo.write(&log_file))
goto err;
}
- /* Read from the file used to cache the queries .*/
- if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
- goto err;
- length=my_b_bytes_in_cache(cache);
- DBUG_EXECUTE_IF("half_binlogged_transaction", length-=100;);
- do
- {
- /* Write data to the binary log file */
- if (my_b_write(&log_file, cache->read_pos, length))
- goto err;
- cache->read_pos=cache->read_end; // Mark buffer used up
- DBUG_EXECUTE_IF("half_binlogged_transaction", goto DBUG_skip_commit;);
- } while ((length=my_b_fill(cache)));
+ if ((write_error= write_cache(cache, false, false)))
+ goto err;
+
if (commit_event && commit_event->write(&log_file))
goto err;
#ifndef DBUG_OFF