diff options
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 186 |
1 files changed, 129 insertions, 57 deletions
diff --git a/sql/log.cc b/sql/log.cc index 150c4a58c63..1b432ca15c0 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -81,23 +81,54 @@ char *make_default_log_name(char *buff,const char* log_ext) } /* + Helper class to hold a mutex for the duration of the + block. + + Eliminates the need for explicit unlocking of mutexes on, e.g., + error returns. On passing a null pointer, the sentry will not do + anything. + */ +class Mutex_sentry +{ +public: + Mutex_sentry(pthread_mutex_t *mutex) + : m_mutex(mutex) + { + if (m_mutex) + pthread_mutex_lock(mutex); + } + + ~Mutex_sentry() + { + if (m_mutex) + pthread_mutex_unlock(m_mutex); +#ifndef DBUG_OFF + m_mutex= 0; +#endif + } + +private: + pthread_mutex_t *m_mutex; + + // It's not allowed to copy this object in any way + Mutex_sentry(Mutex_sentry const&); + void operator=(Mutex_sentry const&); +}; + +/* Helper class to store binary log transaction data. */ class binlog_trx_data { public: binlog_trx_data() -#ifdef HAVE_ROW_BASED_REPLICATION : m_pending(0), before_stmt_pos(MY_OFF_T_UNDEF) -#endif { trans_log.end_of_file= max_binlog_cache_size; } ~binlog_trx_data() { -#ifdef HAVE_ROW_BASED_REPLICATION DBUG_ASSERT(pending() == NULL); -#endif close_cached_file(&trans_log); } @@ -107,11 +138,7 @@ public: bool empty() const { -#ifdef HAVE_ROW_BASED_REPLICATION return pending() == NULL && my_b_tell(&trans_log) == 0; -#else - return my_b_tell(&trans_log) == 0; -#endif } /* @@ -120,11 +147,13 @@ public: */ void truncate(my_off_t pos) { -#ifdef HAVE_ROW_BASED_REPLICATION + DBUG_PRINT("info", ("truncating to position %lu", pos)); + DBUG_PRINT("info", ("before_stmt_pos=%lu", pos)); delete pending(); set_pending(0); -#endif reinit_io_cache(&trans_log, WRITE_CACHE, pos, 0, 0); + if (pos < before_stmt_pos) + before_stmt_pos= MY_OFF_T_UNDEF; } /* @@ -134,13 +163,10 @@ public: void reset() { if (!empty()) truncate(0); -#ifdef HAVE_ROW_BASED_REPLICATION before_stmt_pos= MY_OFF_T_UNDEF; -#endif trans_log.end_of_file= max_binlog_cache_size; } -#ifdef HAVE_ROW_BASED_REPLICATION Rows_log_event *pending() const { return m_pending; @@ -150,12 +176,10 @@ public: { m_pending= pending; } -#endif IO_CACHE trans_log; // The transaction cache private: -#ifdef HAVE_ROW_BASED_REPLICATION /* Pending binrows event. This event is the event where the rows are currently written. @@ -167,7 +191,6 @@ public: Binlog position before the start of the current statement. */ my_off_t before_stmt_pos; -#endif }; handlerton *binlog_hton; @@ -1467,9 +1490,7 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data, were, we would have to ensure that we're not ending a statement inside a stored function. */ -#ifdef HAVE_ROW_BASED_REPLICATION thd->binlog_flush_pending_rows_event(TRUE); -#endif /* We write the transaction cache to the binary log if either we're committing the entire transaction, or if we are doing an @@ -1479,13 +1500,11 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data, { error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev); trx_data->reset(); -#ifdef HAVE_ROW_BASED_REPLICATION /* We need to step the table map version after writing the transaction cache to disk. */ mysql_bin_log.update_table_map_version(); -#endif statistic_increment(binlog_cache_use, &LOCK_status); if (trans_log->disk_writes != 0) { @@ -1494,7 +1513,6 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data, } } } -#ifdef HAVE_ROW_BASED_REPLICATION else { /* @@ -1503,12 +1521,11 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data, If rolling back a statement in a transaction, we truncate the transaction cache to remove the statement. - */ if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) trx_data->reset(); - else - trx_data->truncate(trx_data->before_stmt_pos); // ...statement + else // ...statement + trx_data->truncate(trx_data->before_stmt_pos); /* We need to step the table map version on a rollback to ensure @@ -1517,7 +1534,6 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data, */ mysql_bin_log.update_table_map_version(); } -#endif DBUG_RETURN(error); } @@ -2096,7 +2112,7 @@ bool MYSQL_QUERY_LOG::write(time_t event_time, const char *user_host, if (my_b_write(&log_file, (byte*) "\t\t" ,2) < 0) goto err; - /* command_type, thread_id */ + /* command_type, thread_id */ length= my_snprintf(buff, 32, "%5ld ", (long) thread_id); if (my_b_write(&log_file, (byte*) buff, length)) @@ -3395,7 +3411,6 @@ int THD::binlog_setup_trx_data() DBUG_RETURN(0); } -#ifdef HAVE_ROW_BASED_REPLICATION /* Function to start a statement and optionally a transaction for the binary log. @@ -3437,18 +3452,7 @@ THD::binlog_start_trans_and_stmt() if (trx_data == NULL || trx_data->before_stmt_pos == MY_OFF_T_UNDEF) { - /* - The call to binlog_trans_log_savepos() might create the trx_data - structure, if it didn't exist before, so we save the position - into an auto variable and then write it into the transaction - data for the binary log (i.e., trx_data). - */ - my_off_t pos= 0; - binlog_trans_log_savepos(this, &pos); - trx_data= (binlog_trx_data*) ha_data[binlog_hton->slot]; - - trx_data->before_stmt_pos= pos; - + this->binlog_set_stmt_begin(); if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) trans_register_ha(this, TRUE, binlog_hton); trans_register_ha(this, FALSE, binlog_hton); @@ -3456,6 +3460,51 @@ THD::binlog_start_trans_and_stmt() DBUG_VOID_RETURN; } +void THD::binlog_set_stmt_begin() { + binlog_trx_data *trx_data= + (binlog_trx_data*) ha_data[binlog_hton->slot]; + + /* + The call to binlog_trans_log_savepos() might create the trx_data + structure, if it didn't exist before, so we save the position + into an auto variable and then write it into the transaction + data for the binary log (i.e., trx_data). + */ + my_off_t pos= 0; + binlog_trans_log_savepos(this, &pos); + trx_data= (binlog_trx_data*) ha_data[binlog_hton->slot]; + trx_data->before_stmt_pos= pos; +} + +int THD::binlog_flush_transaction_cache() +{ + DBUG_ENTER("binlog_flush_transaction_cache"); + binlog_trx_data *trx_data= (binlog_trx_data*) ha_data[binlog_hton->slot]; + DBUG_PRINT("enter", ("trx_data=0x%lu", trx_data)); + if (trx_data) + DBUG_PRINT("enter", ("trx_data->before_stmt_pos=%u", + trx_data->before_stmt_pos)); + + /* + Write the transaction cache to the binary log. We don't flush and + sync the log file since we don't know if more will be written to + it. If the caller want the log file sync:ed, the caller has to do + it. + + The transaction data is only reset upon a successful write of the + cache to the binary log. + */ + + if (trx_data && likely(mysql_bin_log.is_open())) { + if (int error= mysql_bin_log.write_cache(&trx_data->trans_log, true, true)) + DBUG_RETURN(error); + trx_data->reset(); + } + + DBUG_RETURN(0); +} + + /* Write a table map to the binary log. */ @@ -3604,7 +3653,6 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, DBUG_RETURN(error); } -#endif /*HAVE_ROW_BASED_REPLICATION*/ /* Write an event to the binary log @@ -3637,11 +3685,9 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) we are inside a stored function, we do not end the statement since this will close all tables on the slave. */ -#ifdef HAVE_ROW_BASED_REPLICATION bool const end_stmt= thd->prelocked_mode && thd->lex->requires_prelocking(); thd->binlog_flush_pending_rows_event(end_stmt); -#endif /*HAVE_ROW_BASED_REPLICATION*/ pthread_mutex_lock(&LOCK_log); @@ -3671,7 +3717,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) } #endif /* HAVE_REPLICATION */ -#if defined(USING_TRANSACTIONS) && defined(HAVE_ROW_BASED_REPLICATION) +#if defined(USING_TRANSACTIONS) /* Should we write to the binlog cache or to the binlog on disk? Write to the binlog cache if: @@ -3706,7 +3752,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) LOCK_log. */ } -#endif /* USING_TRANSACTIONS && HAVE_ROW_BASED_REPLICATION */ +#endif /* USING_TRANSACTIONS */ DBUG_PRINT("info",("event type: %d",event_info->get_type_code())); /* @@ -3869,12 +3915,48 @@ uint MYSQL_BIN_LOG::next_file_id() /* + Write the contents of a cache to the binary log. + + SYNOPSIS + write_cache() + cache Cache to write to the binary log + lock_log True if the LOCK_log mutex should be aquired, false otherwise + sync_log True if the log should be flushed and sync:ed + + DESCRIPTION + Write the contents of the cache to the binary log. The cache will + be reset as a READ_CACHE to be able to read the contents from it. + */ + +int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log) +{ + Mutex_sentry sentry(lock_log ? &LOCK_log : NULL); + + if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) + return ER_ERROR_ON_WRITE; + uint bytes= my_b_bytes_in_cache(cache); + do + { + if (my_b_write(&log_file, cache->read_pos, bytes)) + return ER_ERROR_ON_WRITE; + cache->read_pos= cache->read_end; + } while ((bytes= my_b_fill(cache))); + + if (sync_log) + flush_and_sync(); + + return 0; // All OK +} + +/* Write a cached log entry to the binary log SYNOPSIS write() thd cache The cache to copy to the binlog + commit_event The commit event to print after writing the + contents of the cache. NOTE - We only come here if there is something in the cache. @@ -3934,20 +4016,10 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event) if (qinfo.write(&log_file)) goto err; } - /* Read from the file used to cache the queries .*/ - if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) - goto err; - length=my_b_bytes_in_cache(cache); - DBUG_EXECUTE_IF("half_binlogged_transaction", length-=100;); - do - { - /* Write data to the binary log file */ - if (my_b_write(&log_file, cache->read_pos, length)) - goto err; - cache->read_pos=cache->read_end; // Mark buffer used up - DBUG_EXECUTE_IF("half_binlogged_transaction", goto DBUG_skip_commit;); - } while ((length=my_b_fill(cache))); + if ((write_error= write_cache(cache, false, false))) + goto err; + if (commit_event && commit_event->write(&log_file)) goto err; #ifndef DBUG_OFF |