summaryrefslogtreecommitdiff
path: root/sql/log.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log.cc')
-rw-r--r--sql/log.cc427
1 files changed, 346 insertions, 81 deletions
diff --git a/sql/log.cc b/sql/log.cc
index d30cf3266f9..44d3869e9d5 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -47,6 +47,19 @@ static int binlog_commit(THD *thd, bool all);
static int binlog_rollback(THD *thd, bool all);
static int binlog_prepare(THD *thd, bool all);
+/*
+ This is a POD. Please keep it that way!
+
+ Don't add constructors, destructors, or virtual functions.
+*/
+struct binlog_trx_data {
+ bool empty() const {
+ return pending == NULL && my_b_tell(&trans_log) == 0;
+ }
+ IO_CACHE trans_log; // The transaction cache
+ Rows_log_event *pending; // The pending binrows event
+};
+
handlerton binlog_hton = {
MYSQL_HANDLERTON_INTERFACE_VERSION,
"binlog",
@@ -92,19 +105,45 @@ bool binlog_init()
static int binlog_close_connection(THD *thd)
{
- IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot];
- DBUG_ASSERT(mysql_bin_log.is_open() && !my_b_tell(trans_log));
+ binlog_trx_data *const trx_data=
+ (binlog_trx_data*) thd->ha_data[binlog_hton.slot];
+ IO_CACHE *trans_log= &trx_data->trans_log;
+ DBUG_ASSERT(mysql_bin_log.is_open() && trx_data->empty());
close_cached_file(trans_log);
- my_free((gptr)trans_log, MYF(0));
+ thd->ha_data[binlog_hton.slot]= 0;
+ my_free((gptr)trx_data, MYF(0));
return 0;
}
-static int binlog_end_trans(THD *thd, IO_CACHE *trans_log, Log_event *end_ev)
+static int
+binlog_end_trans(THD *thd, binlog_trx_data *trx_data, Log_event *end_ev)
{
- int error=0;
DBUG_ENTER("binlog_end_trans");
+ int error=0;
+ IO_CACHE *trans_log= &trx_data->trans_log;
+
if (end_ev)
+ {
+ thd->binlog_flush_pending_rows_event(true);
error= mysql_bin_log.write(thd, trans_log, end_ev);
+ }
+ else
+ {
+ thd->binlog_delete_pending_rows_event();
+ }
+
+ /*
+ We need to step the table map version both after writing the
+ entire transaction to the log file and after rolling back the
+ transaction.
+
+ We need to step the table map version after writing the
+ transaction cache to disk. In addition, we need to step the table
+ map version on a rollback to ensure that a new table map event is
+ generated instead of the one that was written to the thrown-away
+ transaction cache.
+ */
+ ++mysql_bin_log.m_table_map_version;
statistic_increment(binlog_cache_use, &LOCK_status);
if (trans_log->disk_writes != 0)
@@ -130,32 +169,36 @@ static int binlog_prepare(THD *thd, bool all)
static int binlog_commit(THD *thd, bool all)
{
- IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot];
DBUG_ENTER("binlog_commit");
+ binlog_trx_data *const trx_data=
+ (binlog_trx_data*) thd->ha_data[binlog_hton.slot];
+ IO_CACHE *trans_log= &trx_data->trans_log;
DBUG_ASSERT(mysql_bin_log.is_open() &&
(all || !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))));
- if (!my_b_tell(trans_log))
+ if (trx_data->empty())
{
// we're here because trans_log was flushed in MYSQL_LOG::log()
DBUG_RETURN(0);
}
Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, FALSE);
- DBUG_RETURN(binlog_end_trans(thd, trans_log, &qev));
+ DBUG_RETURN(binlog_end_trans(thd, trx_data, &qev));
}
static int binlog_rollback(THD *thd, bool all)
{
- int error=0;
- IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot];
DBUG_ENTER("binlog_rollback");
+ int error=0;
+ binlog_trx_data *const trx_data=
+ (binlog_trx_data*) thd->ha_data[binlog_hton.slot];
+ IO_CACHE *trans_log= &trx_data->trans_log;
/*
First assert is guaranteed - see trans_register_ha() call below.
The second must be true. If it is not, we're registering
unnecessary, doing extra work. The cause should be found and eliminated
*/
DBUG_ASSERT(all || !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)));
- DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(trans_log));
+ DBUG_ASSERT(mysql_bin_log.is_open() && !trx_data->empty());
/*
Update the binary log with a BEGIN/ROLLBACK block if we have
cached some queries and we updated some non-transactional
@@ -165,10 +208,10 @@ static int binlog_rollback(THD *thd, bool all)
if (unlikely(thd->options & OPTION_STATUS_NO_TRANS_UPDATE))
{
Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, FALSE);
- error= binlog_end_trans(thd, trans_log, &qev);
+ error= binlog_end_trans(thd, trx_data, &qev);
}
else
- error= binlog_end_trans(thd, trans_log, 0);
+ error= binlog_end_trans(thd, trx_data, 0);
DBUG_RETURN(error);
}
@@ -195,8 +238,10 @@ static int binlog_rollback(THD *thd, bool all)
static int binlog_savepoint_set(THD *thd, void *sv)
{
- IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot];
DBUG_ENTER("binlog_savepoint_set");
+ binlog_trx_data *const trx_data=
+ (binlog_trx_data*) thd->ha_data[binlog_hton.slot];
+ IO_CACHE *trans_log= &trx_data->trans_log;
DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(trans_log));
*(my_off_t *)sv= my_b_tell(trans_log);
@@ -207,8 +252,10 @@ static int binlog_savepoint_set(THD *thd, void *sv)
static int binlog_savepoint_rollback(THD *thd, void *sv)
{
- IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot];
DBUG_ENTER("binlog_savepoint_rollback");
+ binlog_trx_data *const trx_data=
+ (binlog_trx_data*) thd->ha_data[binlog_hton.slot];
+ IO_CACHE *trans_log= &trx_data->trans_log;
DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(trans_log));
/*
@@ -367,6 +414,7 @@ MYSQL_LOG::MYSQL_LOG()
:bytes_written(0), last_time(0), query_start(0), name(0),
prepared_xids(0), log_type(LOG_CLOSED), file_id(1), open_count(1),
write_error(FALSE), inited(FALSE), need_start_event(TRUE),
+ m_table_map_version(0),
description_event_for_exec(0), description_event_for_queue(0)
{
/*
@@ -1363,7 +1411,7 @@ void MYSQL_LOG::new_file(bool need_lock)
to change base names at some point.
*/
THD *thd = current_thd; /* may be 0 if we are reacting to SIGHUP */
- Rotate_log_event r(thd,new_name+dirname_length(new_name),
+ Rotate_log_event r(new_name+dirname_length(new_name),
0, LOG_EVENT_OFFSET, 0);
r.write(&log_file);
bytes_written += r.data_written;
@@ -1589,6 +1637,162 @@ bool MYSQL_LOG::is_query_in_union(THD *thd, query_id_t query_id_param)
query_id_param >= thd->binlog_evt_union.first_query_id);
}
+
+/*
+ These functions are placed in this file since they need access to
+ binlog_hton, which has internal linkage.
+*/
+
+int THD::binlog_setup_trx_data()
+{
+ DBUG_ENTER("THD::binlog_setup_trx_data");
+ binlog_trx_data *trx_data=
+ (binlog_trx_data*) ha_data[binlog_hton.slot];
+
+ if (trx_data)
+ DBUG_RETURN(0); // Already set up
+
+ ha_data[binlog_hton.slot]= trx_data=
+ (binlog_trx_data*) my_malloc(sizeof(binlog_trx_data), MYF(MY_ZEROFILL));
+ if (!trx_data ||
+ open_cached_file(&trx_data->trans_log, mysql_tmpdir,
+ LOG_PREFIX, binlog_cache_size, MYF(MY_WME)))
+ {
+ my_free((gptr)trx_data, MYF(MY_ALLOW_ZERO_PTR));
+ ha_data[binlog_hton.slot]= 0;
+ DBUG_RETURN(1); // Didn't manage to set it up
+ }
+ trx_data->trans_log.end_of_file= max_binlog_cache_size;
+ DBUG_RETURN(0);
+}
+
+Rows_log_event*
+THD::binlog_get_pending_rows_event() const
+{
+ binlog_trx_data *const trx_data=
+ (binlog_trx_data*) ha_data[binlog_hton.slot];
+ /*
+ This is less than ideal, but here's the story: If there is no
+ trx_data, prepare_pending_rows_event() has never been called
+ (since the trx_data is set up there). In that case, we just return
+ NULL.
+ */
+ return trx_data ? trx_data->pending : NULL;
+}
+
+void
+THD::binlog_set_pending_rows_event(Rows_log_event* ev)
+{
+ binlog_trx_data *const trx_data=
+ (binlog_trx_data*) ha_data[binlog_hton.slot];
+ DBUG_ASSERT(trx_data);
+ trx_data->pending= ev;
+}
+
+
+/*
+ Moves the last bunch of rows from the pending Rows event to the binlog
+ (either cached binlog if transaction, or disk binlog). Sets a new pending
+ event.
+*/
+int MYSQL_LOG::flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event)
+{
+ DBUG_ENTER("MYSQL_LOG::flush_and_set_pending_rows_event(event)");
+ DBUG_ASSERT(binlog_row_based && mysql_bin_log.is_open());
+ DBUG_PRINT("enter", ("event=%p", event));
+
+ int error= 0;
+
+ binlog_trx_data *const trx_data=
+ (binlog_trx_data*) thd->ha_data[binlog_hton.slot];
+
+ DBUG_ASSERT(trx_data);
+
+ if (Rows_log_event* pending= trx_data->pending)
+ {
+ IO_CACHE *file= &log_file;
+
+ /*
+ Decide if we should write to the log file directly or to the
+ transaction log.
+ */
+ if (pending->get_cache_stmt() || my_b_tell(&trx_data->trans_log))
+ file= &trx_data->trans_log;
+
+ /*
+ If we are writing to the log file directly, we could avoid
+ locking the log. This does not work since we need to step the
+ m_table_map_version below, and that change has to be protected
+ by the LOCK_log mutex.
+ */
+ pthread_mutex_lock(&LOCK_log);
+
+ /*
+ Write a table map if necessary
+ */
+ if (pending->maybe_write_table_map(thd, file, this))
+ {
+ pthread_mutex_unlock(&LOCK_log);
+ DBUG_RETURN(2);
+ }
+
+ /*
+ Write pending event to log file or transaction cache
+ */
+ if (pending->write(file))
+ {
+ pthread_mutex_unlock(&LOCK_log);
+ DBUG_RETURN(1);
+ }
+
+ /*
+ We step the table map version if we are writing an event
+ representing the end of a statement. We do this regardless of
+ wheather we write to the transaction cache or to directly to the
+ file.
+
+ In an ideal world, we could avoid stepping the table map version
+ if we were writing to a transaction cache, since we could then
+ reuse the table map that was written earlier in the transaction
+ cache. This does not work since STMT_END_F implies closing all
+ table mappings on the slave side.
+
+ TODO: Find a solution so that table maps does not have to be
+ written several times within a transaction.
+ */
+ if (pending->get_flags(Rows_log_event::STMT_END_F))
+ ++m_table_map_version;
+
+ delete pending;
+
+ if (file == &log_file)
+ {
+ error= flush_and_sync();
+ if (!error)
+ {
+ signal_update();
+ rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
+ }
+ }
+
+ pthread_mutex_unlock(&LOCK_log);
+ }
+ else if (event && event->get_cache_stmt()) /* && pending == 0 */
+ {
+ /*
+ If we are setting a non-null event for a table that is
+ transactional, we start a transaction here as well.
+ */
+ trans_register_ha(thd,
+ thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN),
+ &binlog_hton);
+ }
+
+ trx_data->pending= event;
+
+ DBUG_RETURN(error);
+}
+
/*
Write an event to the binary log
*/
@@ -1609,7 +1813,29 @@ bool MYSQL_LOG::write(Log_event *event_info)
thd->binlog_evt_union.unioned_events_trans |= event_info->cache_stmt;
DBUG_RETURN(0);
}
-
+
+ /*
+ Flush the pending rows event to the transaction cache or to the
+ log file. Since this function potentially aquire the LOCK_log
+ mutex, we do this before aquiring the LOCK_log mutex in this
+ function.
+
+ This is not optimal, but necessary in the current implementation
+ since there is code that writes rows to system tables without
+ using some way to flush the pending event (e.g., binlog_query()).
+
+ TODO: There shall be no writes to any system table after calling
+ binlog_query(), so these writes has to be moved to before the call
+ of binlog_query() for correct functioning.
+
+ This is necessesary not only for RBR, but the master might crash
+ after binlogging the query but before changing the system tables.
+ This means that the slave and the master are not in the same state
+ (after the master has restarted), so therefore we have to
+ eliminate this problem.
+ */
+ thd->binlog_flush_pending_rows_event(true);
+
pthread_mutex_lock(&LOCK_log);
/*
@@ -1649,37 +1875,26 @@ bool MYSQL_LOG::write(Log_event *event_info)
*/
if (opt_using_transactions && thd)
{
- IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot];
+ if (thd->binlog_setup_trx_data())
+ goto err;
- if (event_info->get_cache_stmt())
- {
- if (!trans_log)
- {
- thd->ha_data[binlog_hton.slot]= trans_log= (IO_CACHE *)
- my_malloc(sizeof(IO_CACHE), MYF(MY_ZEROFILL));
- if (!trans_log || open_cached_file(trans_log, mysql_tmpdir,
- LOG_PREFIX,
- binlog_cache_size, MYF(MY_WME)))
- {
- my_free((gptr)trans_log, MYF(MY_ALLOW_ZERO_PTR));
- thd->ha_data[binlog_hton.slot]= trans_log= 0;
- goto err;
- }
- trans_log->end_of_file= max_binlog_cache_size;
- trans_register_ha(thd,
- thd->options & (OPTION_NOT_AUTOCOMMIT |
- OPTION_BEGIN),
- &binlog_hton);
- }
- else if (!my_b_tell(trans_log))
- trans_register_ha(thd,
- thd->options & (OPTION_NOT_AUTOCOMMIT |
- OPTION_BEGIN),
- &binlog_hton);
- file= trans_log;
- }
- else if (trans_log && my_b_tell(trans_log))
+ binlog_trx_data *const trx_data=
+ (binlog_trx_data*) thd->ha_data[binlog_hton.slot];
+ IO_CACHE *trans_log= &trx_data->trans_log;
+
+ if (event_info->get_cache_stmt() && !my_b_tell(trans_log))
+ trans_register_ha(thd,
+ thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN),
+ &binlog_hton);
+
+ if (event_info->get_cache_stmt() || my_b_tell(trans_log))
file= trans_log;
+ /*
+ Note: as Mats suggested, for all the cases above where we write to
+ trans_log, it sounds unnecessary to lock LOCK_log. We should rather
+ test first if we want to write to trans_log, and if not, lock
+ LOCK_log. TODO.
+ */
}
#endif
DBUG_PRINT("info",("event type=%d",event_info->get_type_code()));
@@ -1694,42 +1909,49 @@ bool MYSQL_LOG::write(Log_event *event_info)
of the SQL command
*/
+ /*
+ If row-based binlogging, Insert_id, Rand and other kind of "setting
+ context" events are not needed.
+ */
if (thd)
{
- if (thd->last_insert_id_used)
+ if (!binlog_row_based)
{
- Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT,
- thd->current_insert_id);
- if (e.write(file))
- goto err;
- }
- if (thd->insert_id_used)
- {
- Intvar_log_event e(thd,(uchar) INSERT_ID_EVENT,thd->last_insert_id);
- if (e.write(file))
- goto err;
- }
- if (thd->rand_used)
- {
- Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2);
- if (e.write(file))
- goto err;
- }
- if (thd->user_var_events.elements)
- {
- for (uint i= 0; i < thd->user_var_events.elements; i++)
- {
- BINLOG_USER_VAR_EVENT *user_var_event;
- get_dynamic(&thd->user_var_events,(gptr) &user_var_event, i);
- User_var_log_event e(thd, user_var_event->user_var_event->name.str,
- user_var_event->user_var_event->name.length,
- user_var_event->value,
- user_var_event->length,
- user_var_event->type,
- user_var_event->charset_number);
- if (e.write(file))
- goto err;
- }
+ if (thd->last_insert_id_used)
+ {
+ Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT,
+ thd->current_insert_id);
+ if (e.write(file))
+ goto err;
+ }
+ if (thd->insert_id_used)
+ {
+ Intvar_log_event e(thd,(uchar) INSERT_ID_EVENT,thd->last_insert_id);
+ if (e.write(file))
+ goto err;
+ }
+ if (thd->rand_used)
+ {
+ Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2);
+ if (e.write(file))
+ goto err;
+ }
+ if (thd->user_var_events.elements)
+ {
+ for (uint i= 0; i < thd->user_var_events.elements; i++)
+ {
+ BINLOG_USER_VAR_EVENT *user_var_event;
+ get_dynamic(&thd->user_var_events,(gptr) &user_var_event, i);
+ User_var_log_event e(thd, user_var_event->user_var_event->name.str,
+ user_var_event->user_var_event->name.length,
+ user_var_event->value,
+ user_var_event->length,
+ user_var_event->type,
+ user_var_event->charset_number);
+ if (e.write(file))
+ goto err;
+ }
+ }
}
}
@@ -1760,6 +1982,9 @@ err:
}
}
+ if (event_info->flags & LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F)
+ ++m_table_map_version;
+
pthread_mutex_unlock(&LOCK_log);
DBUG_RETURN(error);
}
@@ -2307,6 +2532,44 @@ void MYSQL_LOG::signal_update()
DBUG_VOID_RETURN;
}
+#ifndef MYSQL_CLIENT
+bool MYSQL_LOG::write_table_map(THD *thd, IO_CACHE *file, TABLE* table,
+ bool is_transactional)
+{
+ DBUG_ENTER("MYSQL_LOG::write_table_map()");
+ DBUG_PRINT("enter", ("table=%p (%s: %u)",
+ table, table->s->table_name, table->s->table_map_id));
+
+ /* Pre-conditions */
+ DBUG_ASSERT(binlog_row_based && is_open());
+ DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
+
+#ifndef DBUG_OFF
+ /*
+ We only need to execute under the LOCK_log mutex if we are writing
+ to the log file; otherwise, we are writing to a thread-specific
+ transaction cache and there is no need to serialize this event
+ with events in other threads.
+ */
+ if (file == &log_file)
+ safe_mutex_assert_owner(&LOCK_log);
+#endif
+
+ Table_map_log_event::flag_set const
+ flags= Table_map_log_event::NO_FLAGS;
+
+ Table_map_log_event
+ the_event(thd, table, table->s->table_map_id, is_transactional, flags);
+
+ if (the_event.write(file))
+ DBUG_RETURN(1);
+
+ table->s->table_map_version= m_table_map_version;
+ DBUG_RETURN(0);
+}
+#endif /* !defined(MYSQL_CLIENT) */
+
+
#ifdef __NT__
void print_buffer_to_nt_eventlog(enum loglevel level, char *buff,
uint length, int buffLen)
@@ -3013,9 +3276,11 @@ void TC_LOG_BINLOG::close()
*/
int TC_LOG_BINLOG::log(THD *thd, my_xid xid)
{
+ DBUG_ENTER("TC_LOG_BINLOG::log");
Xid_log_event xle(thd, xid);
- IO_CACHE *trans_log= (IO_CACHE*)thd->ha_data[binlog_hton.slot];
- return !binlog_end_trans(thd, trans_log, &xle); // invert return value
+ binlog_trx_data *trx_data=
+ (binlog_trx_data*) thd->ha_data[binlog_hton.slot];
+ DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle)); // invert return value
}
void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)