summaryrefslogtreecommitdiff
path: root/sql/log.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log.cc')
-rw-r--r--sql/log.cc1310
1 files changed, 764 insertions, 546 deletions
diff --git a/sql/log.cc b/sql/log.cc
index b63d72f0d4a..a9f486d88c1 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -52,12 +52,10 @@
#include "sql_plugin.h"
#include "rpl_handler.h"
-#ifdef WITH_WSREP
-#include "wsrep_mysqld.h"
-#endif /* WITH_WSREP */
#include "debug_sync.h"
#include "sql_show.h"
#include "my_pthread.h"
+#include "wsrep_mysqld.h"
/* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024
@@ -66,8 +64,12 @@
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
+handlerton *binlog_hton;
LOGGER logger;
+const char *log_bin_index= 0;
+const char *log_bin_basename= 0;
+
MYSQL_BIN_LOG mysql_bin_log(&sync_binlog_period);
static bool test_if_number(const char *str,
@@ -94,6 +96,7 @@ ulong opt_binlog_dbug_fsync_sleep= 0;
mysql_mutex_t LOCK_prepare_ordered;
mysql_cond_t COND_prepare_ordered;
+mysql_mutex_t LOCK_after_binlog_sync;
mysql_mutex_t LOCK_commit_ordered;
static ulonglong binlog_status_var_num_commits;
@@ -263,9 +266,9 @@ public:
return m_pending;
}
- void set_pending(Rows_log_event *const pending)
+ void set_pending(Rows_log_event *const pending_arg)
{
- m_pending= pending;
+ m_pending= pending_arg;
}
void set_incident(void)
@@ -299,11 +302,12 @@ public:
incident= FALSE;
before_stmt_pos= MY_OFF_T_UNDEF;
/*
- The truncate function calls reinit_io_cache that calls my_b_flush_io_cache
- which may increase disk_writes. This breaks the disk_writes use by the
- binary log which aims to compute the ratio between in-memory cache usage
- and disk cache usage. To avoid this undesirable behavior, we reset the
- variable after truncating the cache.
+ The truncate function calls reinit_io_cache that calls
+ my_b_flush_io_cache which may increase disk_writes. This breaks
+ the disk_writes use by the binary log which aims to compute the
+ ratio between in-memory cache usage and disk cache usage. To
+ avoid this undesirable behavior, we reset the variable after
+ truncating the cache.
*/
cache_log.disk_writes= 0;
DBUG_ASSERT(empty());
@@ -526,16 +530,11 @@ private:
binlog_cache_mngr(const binlog_cache_mngr& info);
};
-handlerton *binlog_hton;
-#ifdef WITH_WSREP
-extern handlerton *wsrep_hton;
-#endif
-
bool LOGGER::is_log_table_enabled(uint log_table_type)
{
switch (log_table_type) {
case QUERY_LOG_SLOW:
- return (table_log_handler != NULL) && opt_slow_log;
+ return (table_log_handler != NULL) && global_system_variables.sql_log_slow;
case QUERY_LOG_GENERAL:
return (table_log_handler != NULL) && opt_log ;
default:
@@ -544,67 +543,6 @@ bool LOGGER::is_log_table_enabled(uint log_table_type)
}
}
-#ifdef WITH_WSREP
-IO_CACHE * get_trans_log(THD * thd)
-{
- binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*)
- thd_get_ha_data(thd, binlog_hton);
- if (cache_mngr)
- {
- return cache_mngr->get_binlog_cache_log(true);
- }
- else
- {
- WSREP_DEBUG("binlog cache not initialized, conn :%ld", thd->thread_id);
- return NULL;
- }
-}
-
-
-bool wsrep_trans_cache_is_empty(THD *thd)
-{
- binlog_cache_mngr *const cache_mngr=
- (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
- return (!cache_mngr || cache_mngr->trx_cache.empty());
-}
-
-void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end)
-{
- thd->binlog_flush_pending_rows_event(stmt_end);
-}
-void thd_binlog_trx_reset(THD * thd)
-{
- /*
- todo: fix autocommit select to not call the caller
- */
- if (thd_get_ha_data(thd, binlog_hton) != NULL)
- {
- binlog_cache_mngr *const cache_mngr=
- (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
- if (cache_mngr)
- {
- cache_mngr->reset(false, true);
- if (!cache_mngr->stmt_cache.empty())
- {
- WSREP_DEBUG("pending events in stmt cache, sql: %s", thd->query());
- cache_mngr->stmt_cache.reset();
- }
- }
- }
- thd->clear_binlog_table_maps();
-}
-
-void thd_binlog_rollback_stmt(THD * thd)
-{
- WSREP_DEBUG("thd_binlog_rollback_stmt :%ld", thd->thread_id);
- binlog_cache_mngr *const cache_mngr=
- (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
- if (cache_mngr) cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
-}
-
-#endif
-
-
/**
Check if a given table is opened log table
@@ -702,7 +640,7 @@ void Log_to_csv_event_handler::cleanup()
bool Log_to_csv_event_handler::
log_general(THD *thd, my_hrtime_t event_time, const char *user_host,
- uint user_host_len, int thread_id,
+ uint user_host_len, int thread_id_arg,
const char *command_type, uint command_type_len,
const char *sql_text, uint sql_text_len,
CHARSET_INFO *client_cs)
@@ -783,7 +721,7 @@ bool Log_to_csv_event_handler::
/* do a write */
if (table->field[1]->store(user_host, user_host_len, client_cs) ||
- table->field[2]->store((longlong) thread_id, TRUE) ||
+ table->field[2]->store((longlong) thread_id_arg, TRUE) ||
table->field[3]->store((longlong) global_system_variables.server_id,
TRUE) ||
table->field[4]->store(command_type, command_type_len, client_cs))
@@ -916,7 +854,7 @@ bool Log_to_csv_event_handler::
restore_record(table, s->default_values); // Get empty record
/* check that all columns exist */
- if (table->s->fields < 11)
+ if (table->s->fields < 13)
goto err;
/* store the time and user values */
@@ -997,6 +935,12 @@ bool Log_to_csv_event_handler::
if (table->field[11]->store((longlong) thd->thread_id, TRUE))
goto err;
+ /* Rows_affected */
+ if (table->field[12]->store(thd->get_stmt_da()->is_ok() ?
+ (longlong) thd->get_stmt_da()->affected_rows() :
+ 0, TRUE))
+ goto err;
+
/* log table entries are not replicated */
if (table->file->ha_write_row(table->record[0]))
goto err;
@@ -1106,7 +1050,7 @@ bool Log_to_file_event_handler::
bool Log_to_file_event_handler::
log_general(THD *thd, my_hrtime_t event_time, const char *user_host,
- uint user_host_len, int thread_id,
+ uint user_host_len, int thread_id_arg,
const char *command_type, uint command_type_len,
const char *sql_text, uint sql_text_len,
CHARSET_INFO *client_cs)
@@ -1115,7 +1059,7 @@ bool Log_to_file_event_handler::
thd->push_internal_handler(&error_handler);
bool retval= mysql_log.write(hrtime_to_time(event_time), user_host,
user_host_len,
- thread_id, command_type, command_type_len,
+ thread_id_arg, command_type, command_type_len,
sql_text, sql_text_len);
thd->pop_internal_handler();
return retval;
@@ -1126,7 +1070,7 @@ bool Log_to_file_event_handler::init()
{
if (!is_initialized)
{
- if (opt_slow_log)
+ if (global_system_variables.sql_log_slow)
mysql_slow_log.open_slow_log(opt_slow_logname);
if (opt_log)
@@ -1150,7 +1094,7 @@ void Log_to_file_event_handler::flush()
/* reopen log files */
if (opt_log)
mysql_log.reopen_file();
- if (opt_slow_log)
+ if (global_system_variables.sql_log_slow)
mysql_slow_log.reopen_file();
}
@@ -1278,7 +1222,7 @@ bool LOGGER::flush_slow_log()
logger.lock_exclusive();
/* Reopen slow log file */
- if (opt_slow_log)
+ if (global_system_variables.sql_log_slow)
file_log_handler->get_mysql_slow_log()->reopen_file();
/* End of log flush */
@@ -1348,11 +1292,11 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length,
if (*slow_log_handler_list)
{
/* do not log slow queries from replication threads */
- if (thd->slave_thread && !opt_log_slow_slave_statements)
+ if (!thd->variables.sql_log_slow)
return 0;
lock_shared();
- if (!opt_slow_log)
+ if (!global_system_variables.sql_log_slow)
{
unlock();
return 0;
@@ -1526,7 +1470,7 @@ bool LOGGER::activate_log_handler(THD* thd, uint log_type)
lock_exclusive();
switch (log_type) {
case QUERY_LOG_SLOW:
- if (!opt_slow_log)
+ if (!global_system_variables.sql_log_slow)
{
file_log= file_log_handler->get_mysql_slow_log();
@@ -1540,7 +1484,7 @@ bool LOGGER::activate_log_handler(THD* thd, uint log_type)
else
{
init_slow_log(log_output_options);
- opt_slow_log= TRUE;
+ global_system_variables.sql_log_slow= TRUE;
}
}
break;
@@ -1574,12 +1518,11 @@ bool LOGGER::activate_log_handler(THD* thd, uint log_type)
void LOGGER::deactivate_log_handler(THD *thd, uint log_type)
{
my_bool *tmp_opt= 0;
- MYSQL_LOG *file_log;
- LINT_INIT(file_log);
+ MYSQL_LOG *UNINIT_VAR(file_log);
switch (log_type) {
case QUERY_LOG_SLOW:
- tmp_opt= &opt_slow_log;
+ tmp_opt= &global_system_variables.sql_log_slow;
file_log= file_log_handler->get_mysql_slow_log();
break;
case QUERY_LOG_GENERAL:
@@ -1655,11 +1598,7 @@ binlog_trans_log_savepos(THD *thd, my_off_t *pos)
DBUG_ENTER("binlog_trans_log_savepos");
DBUG_ASSERT(pos != NULL);
binlog_cache_mngr *const cache_mngr= thd->binlog_setup_trx_data();
-#ifdef WITH_WSREP
DBUG_ASSERT((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open());
-#else
- DBUG_ASSERT(mysql_bin_log.is_open());
-#endif
*pos= cache_mngr->trx_cache.get_byte_position();
DBUG_PRINT("return", ("*pos: %lu", (ulong) *pos));
DBUG_VOID_RETURN;
@@ -1707,16 +1646,8 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos)
int binlog_init(void *p)
{
binlog_hton= (handlerton *)p;
-#ifdef WITH_WSREP
- if (WSREP_ON)
- binlog_hton->state= SHOW_OPTION_YES;
- else
- {
-#endif /* WITH_WSREP */
- binlog_hton->state=opt_bin_log ? SHOW_OPTION_YES : SHOW_OPTION_NO;
-#ifdef WITH_WSREP
- }
-#endif /* WITH_WSREP */
+ binlog_hton->state= (WSREP_ON || opt_bin_log) ? SHOW_OPTION_YES
+ : SHOW_OPTION_NO;
binlog_hton->db_type=DB_TYPE_BINLOG;
binlog_hton->savepoint_offset= sizeof(my_off_t);
binlog_hton->close_connection= binlog_close_connection;
@@ -1860,7 +1791,9 @@ binlog_commit_flush_stmt_cache(THD *thd, bool all,
#ifdef WITH_WSREP
if (thd->wsrep_mysql_replicated > 0)
{
- WSREP_DEBUG("avoiding binlog_commit_flush_trx_cache: %d", thd->wsrep_mysql_replicated);
+ DBUG_ASSERT(WSREP_ON);
+ WSREP_DEBUG("avoiding binlog_commit_flush_trx_cache: %d",
+ thd->wsrep_mysql_replicated);
return 0;
}
#endif
@@ -2020,12 +1953,12 @@ static bool trans_cannot_safely_rollback(THD *thd, bool all)
return ((thd->variables.option_bits & OPTION_KEEP_LOG) ||
(trans_has_updated_non_trans_table(thd) &&
- WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT) ||
+ thd->wsrep_binlog_format() == BINLOG_FORMAT_STMT) ||
(cache_mngr->trx_cache.changes_to_non_trans_temp_table() &&
- WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED) ||
+ thd->wsrep_binlog_format() == BINLOG_FORMAT_MIXED) ||
(trans_has_updated_non_trans_table(thd) &&
ending_single_stmt_trans(thd,all) &&
- WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED));
+ thd->wsrep_binlog_format() == BINLOG_FORMAT_MIXED));
}
@@ -2047,9 +1980,12 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
DBUG_ENTER("binlog_commit");
binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
-#ifdef WITH_WSREP
- if (!cache_mngr) DBUG_RETURN(0);
-#endif /* WITH_WSREP */
+
+ if (!cache_mngr)
+ {
+ DBUG_ASSERT(WSREP(thd));
+ DBUG_RETURN(0);
+ }
DBUG_PRINT("debug",
("all: %d, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s",
@@ -2106,9 +2042,12 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
int error= 0;
binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
-#ifdef WITH_WSREP
- if (!cache_mngr) DBUG_RETURN(0);
-#endif /* WITH_WSREP */
+
+ if (!cache_mngr)
+ {
+ DBUG_ASSERT(WSREP(thd));
+ DBUG_RETURN(0);
+ }
DBUG_PRINT("debug", ("all: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s",
YESNO(all),
@@ -2137,12 +2076,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
cache_mngr->reset(false, true);
DBUG_RETURN(error);
}
-#ifdef WITH_WSREP
- if (!wsrep_emulate_bin_log &&
- mysql_bin_log.check_write_error(thd))
-#else
- if (mysql_bin_log.check_write_error(thd))
-#endif
+ if (!wsrep_emulate_bin_log && mysql_bin_log.check_write_error(thd))
{
/*
"all == true" means that a "rollback statement" triggered the error and
@@ -2158,7 +2092,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
error |= binlog_truncate_trx_cache(thd, cache_mngr, all);
}
else if (!error)
- {
+ {
if (ending_trans(thd, all) && trans_cannot_safely_rollback(thd, all))
error= binlog_rollback_flush_trx_cache(thd, all, cache_mngr);
/*
@@ -2173,9 +2107,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
else if (ending_trans(thd, all) ||
(!(thd->variables.option_bits & OPTION_KEEP_LOG) &&
(!stmt_has_updated_non_trans_table(thd) ||
- WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_STMT) &&
+ thd->wsrep_binlog_format() != BINLOG_FORMAT_STMT) &&
(!cache_mngr->trx_cache.changes_to_non_trans_temp_table() ||
- WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_MIXED)))
+ thd->wsrep_binlog_format() != BINLOG_FORMAT_MIXED)))
error= binlog_truncate_trx_cache(thd, cache_mngr, all);
}
@@ -2216,11 +2150,11 @@ void MYSQL_BIN_LOG::set_write_error(THD *thd, bool is_transactional)
{
if (is_transactional)
{
- my_message(ER_TRANS_CACHE_FULL, ER(ER_TRANS_CACHE_FULL), MYF(MY_WME));
+ my_message(ER_TRANS_CACHE_FULL, ER_THD(thd, ER_TRANS_CACHE_FULL), MYF(MY_WME));
}
else
{
- my_message(ER_STMT_CACHE_FULL, ER(ER_STMT_CACHE_FULL), MYF(MY_WME));
+ my_message(ER_STMT_CACHE_FULL, ER_THD(thd, ER_STMT_CACHE_FULL), MYF(MY_WME));
}
}
else
@@ -2280,17 +2214,14 @@ bool MYSQL_BIN_LOG::check_write_error(THD *thd)
static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
{
- DBUG_ENTER("binlog_savepoint_set");
int error= 1;
+ DBUG_ENTER("binlog_savepoint_set");
+
+ if (wsrep_emulate_bin_log)
+ DBUG_RETURN(0);
+
char buf[1024];
-#ifdef WITH_WSREP
- if (wsrep_emulate_bin_log) DBUG_RETURN(0);
- /*
- Clear table maps before writing SAVEPOINT event. This enforces
- recreation of table map events for the following row event.
- */
- thd->clear_binlog_table_maps();
-#endif /* WITH_WSREP */
+
String log_query(buf, sizeof(buf), &my_charset_bin);
if (log_query.copy(STRING_WITH_LEN("SAVEPOINT "), &my_charset_bin) ||
append_identifier(thd, &log_query,
@@ -2322,17 +2253,15 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
{
DBUG_ENTER("binlog_savepoint_rollback");
+ if (wsrep_emulate_bin_log)
+ DBUG_RETURN(0);
+
/*
Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some
non-transactional table. Otherwise, truncate the binlog cache starting
from the SAVEPOINT command.
*/
-#ifdef WITH_WSREP
- if (!wsrep_emulate_bin_log &&
- unlikely(trans_has_updated_non_trans_table(thd) ||
-#else
if (unlikely(trans_has_updated_non_trans_table(thd) ||
-#endif
(thd->variables.option_bits & OPTION_KEEP_LOG)))
{
char buf[1024];
@@ -2346,10 +2275,20 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
TRUE, FALSE, TRUE, errcode);
DBUG_RETURN(mysql_bin_log.write(&qinfo));
}
-#ifdef WITH_WSREP
- if (!wsrep_emulate_bin_log)
-#endif
- binlog_trans_log_truncate(thd, *(my_off_t*)sv);
+
+ binlog_trans_log_truncate(thd, *(my_off_t*)sv);
+
+ /*
+ When a SAVEPOINT is executed inside a stored function/trigger we force the
+ pending event to be flushed with a STMT_END_F flag and clear the table maps
+ as well to ensure that following DMLs will have a clean state to start
+ with. ROLLBACK inside a stored routine has to finalize possibly existing
+ current row-based pending event with cleaning up table maps. That ensures
+ that following DMLs will have a clean state to start with.
+ */
+ if (thd->in_sub_stmt)
+ thd->clear_binlog_table_maps();
+
DBUG_RETURN(0);
}
@@ -2486,13 +2425,13 @@ static void setup_windows_event_source()
nonzero if not possible to get unique filename.
*/
-static int find_uniq_filename(char *name)
+static int find_uniq_filename(char *name, ulong next_log_number)
{
uint i;
char buff[FN_REFLEN], ext_buf[FN_REFLEN];
struct st_my_dir *dir_info;
reg1 struct fileinfo *file_info;
- ulong max_found= 0, next= 0, number= 0;
+ ulong max_found, next, number;
size_t buf_length, length;
char *start, *end;
int error= 0;
@@ -2512,6 +2451,7 @@ static int find_uniq_filename(char *name)
DBUG_RETURN(1);
}
file_info= dir_info->dir_entry;
+ max_found= next_log_number ? next_log_number-1 : 0;
for (i= dir_info->number_of_files ; i-- ; file_info++)
{
if (strncmp(file_info->name, start, length) == 0 &&
@@ -2523,7 +2463,7 @@ static int find_uniq_filename(char *name)
my_dirend(dir_info);
/* check if reached the maximum possible extension number */
- if (max_found == MAX_LOG_UNIQUE_FN_EXT)
+ if (max_found >= MAX_LOG_UNIQUE_FN_EXT)
{
sql_print_error("Log filename extension number exhausted: %06lu. \
Please fix this by archiving old logs and \
@@ -2584,14 +2524,18 @@ void MYSQL_LOG::init(enum_log_type log_type_arg,
bool MYSQL_LOG::init_and_set_log_file_name(const char *log_name,
const char *new_name,
+ ulong next_log_number,
enum_log_type log_type_arg,
enum cache_type io_cache_type_arg)
{
init(log_type_arg, io_cache_type_arg);
- if (new_name && !strmov(log_file_name, new_name))
- return TRUE;
- else if (!new_name && generate_new_name(log_file_name, log_name))
+ if (new_name)
+ {
+ strmov(log_file_name, new_name);
+ }
+ else if (!new_name && generate_new_name(log_file_name, log_name,
+ next_log_number))
return TRUE;
return FALSE;
@@ -2624,7 +2568,8 @@ bool MYSQL_LOG::open(
PSI_file_key log_file_key,
#endif
const char *log_name, enum_log_type log_type_arg,
- const char *new_name, enum cache_type io_cache_type_arg)
+ const char *new_name, ulong next_log_number,
+ enum cache_type io_cache_type_arg)
{
char buff[FN_REFLEN];
MY_STAT f_stat;
@@ -2643,7 +2588,13 @@ bool MYSQL_LOG::open(
goto err;
}
- if (init_and_set_log_file_name(name, new_name,
+ /*
+ log_type is LOG_UNKNOWN if we should not generate a new name
+ This is only used when called from MYSQL_BINARY_LOG::open, which
+ has already updated log_file_name.
+ */
+ if (log_type_arg != LOG_UNKNOWN &&
+ init_and_set_log_file_name(name, new_name, next_log_number,
log_type_arg, io_cache_type_arg))
goto err;
@@ -2798,7 +2749,8 @@ void MYSQL_LOG::cleanup()
}
-int MYSQL_LOG::generate_new_name(char *new_name, const char *log_name)
+int MYSQL_LOG::generate_new_name(char *new_name, const char *log_name,
+ ulong next_log_number)
{
fn_format(new_name, log_name, mysql_data_home, "", 4);
if (log_type == LOG_BIN)
@@ -2806,10 +2758,12 @@ int MYSQL_LOG::generate_new_name(char *new_name, const char *log_name)
if (!fn_ext(log_name)[0])
{
if (DBUG_EVALUATE_IF("binlog_inject_new_name_error", TRUE, FALSE) ||
- find_uniq_filename(new_name))
+ find_uniq_filename(new_name, next_log_number))
{
- if (current_thd)
- my_printf_error(ER_NO_UNIQUE_LOGFILE, ER(ER_NO_UNIQUE_LOGFILE),
+ THD *thd= current_thd;
+ if (thd)
+ my_printf_error(ER_NO_UNIQUE_LOGFILE,
+ ER_THD(thd, ER_NO_UNIQUE_LOGFILE),
MYF(ME_FATALERROR), log_name);
sql_print_error(ER_DEFAULT(ER_NO_UNIQUE_LOGFILE), log_name);
return 1;
@@ -2857,7 +2811,7 @@ void MYSQL_QUERY_LOG::reopen_file()
#ifdef HAVE_PSI_INTERFACE
m_log_file_key,
#endif
- save_name, log_type, 0, io_cache_type);
+ save_name, log_type, 0, 0, io_cache_type);
my_free(save_name);
mysql_mutex_unlock(&LOCK_log);
@@ -2892,7 +2846,7 @@ void MYSQL_QUERY_LOG::reopen_file()
*/
bool MYSQL_QUERY_LOG::write(time_t event_time, const char *user_host,
- uint user_host_len, int thread_id,
+ uint user_host_len, int thread_id_arg,
const char *command_type, uint command_type_len,
const char *sql_text, uint sql_text_len)
{
@@ -2931,7 +2885,7 @@ bool MYSQL_QUERY_LOG::write(time_t event_time, const char *user_host,
goto err;
/* command_type, thread_id */
- length= my_snprintf(buff, 32, "%5ld ", (long) thread_id);
+ length= my_snprintf(buff, 32, "%5ld ", (long) thread_id_arg);
if (my_b_write(&log_file, (uchar*) buff, length))
goto err;
@@ -3043,12 +2997,16 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
sprintf(lock_time_buff, "%.6f", ulonglong2double(lock_utime)/1000000.0);
if (my_b_printf(&log_file,
"# Thread_id: %lu Schema: %s QC_hit: %s\n" \
- "# Query_time: %s Lock_time: %s Rows_sent: %lu Rows_examined: %lu\n",
+ "# Query_time: %s Lock_time: %s Rows_sent: %lu Rows_examined: %lu\n" \
+ "# Rows_affected: %lu\n",
(ulong) thd->thread_id, (thd->db ? thd->db : ""),
((thd->query_plan_flags & QPLAN_QC) ? "Yes" : "No"),
query_time_buff, lock_time_buff,
(ulong) thd->get_sent_row_count(),
- (ulong) thd->get_examined_row_count()) == (size_t) -1)
+ (ulong) thd->get_examined_row_count(),
+ thd->get_stmt_da()->is_ok() ?
+ (ulong) thd->get_stmt_da()->affected_rows() :
+ 0) == (size_t) -1)
tmp_errno= errno;
if ((thd->variables.log_slow_verbosity & LOG_SLOW_VERBOSITY_QUERY_PLAN) &&
(thd->query_plan_flags &
@@ -3076,7 +3034,7 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
{
StringBuffer<128> buf;
DBUG_ASSERT(!thd->free_list);
- if (!print_explain_query(thd->lex, thd, &buf))
+ if (!print_explain_for_slow_log(thd->lex, thd, &buf))
my_b_printf(&log_file, "%s", buf.c_ptr_safe());
thd->free_items();
}
@@ -3140,7 +3098,7 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
if (! write_error)
{
write_error= 1;
- sql_print_error(ER(ER_ERROR_ON_WRITE), name, tmp_errno);
+ sql_print_error(ER_THD(thd, ER_ERROR_ON_WRITE), name, tmp_errno);
}
}
}
@@ -3155,8 +3113,8 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time,
first change fn_format() to cut the file name if it's too long.
*/
const char *MYSQL_LOG::generate_name(const char *log_name,
- const char *suffix,
- bool strip_ext, char *buff)
+ const char *suffix,
+ bool strip_ext, char *buff)
{
if (!log_name || !log_name[0])
{
@@ -3176,6 +3134,22 @@ const char *MYSQL_LOG::generate_name(const char *log_name,
}
+/*
+ Print some additional information about addition/removal of
+ XID list entries.
+ TODO: Remove once MDEV-9510 is fixed.
+*/
+#ifdef WITH_WSREP
+#define WSREP_XID_LIST_ENTRY(X, Y) \
+ if (wsrep_debug) \
+ { \
+ char buf[FN_REFLEN]; \
+ strmake(buf, Y->binlog_name, Y->binlog_name_len); \
+ WSREP_DEBUG(X, buf, Y->binlog_id); \
+ }
+#else
+#define WSREP_XID_LIST_ENTRY(X, Y) do { } while(0)
+#endif
MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period)
:reset_master_pending(0), mark_xid_done_waiting(0),
@@ -3240,6 +3214,8 @@ void MYSQL_BIN_LOG::cleanup()
*/
DBUG_ASSERT(b->xid_count == 0);
DBUG_ASSERT(!binlog_xid_count_list.head());
+ WSREP_XID_LIST_ENTRY("MYSQL_BIN_LOG::cleanup(): Removing xid_list_entry "
+ "for %s (%lu)", b);
my_free(b);
}
@@ -3247,6 +3223,7 @@ void MYSQL_BIN_LOG::cleanup()
mysql_mutex_destroy(&LOCK_index);
mysql_mutex_destroy(&LOCK_xid_list);
mysql_mutex_destroy(&LOCK_binlog_background_thread);
+ mysql_mutex_destroy(&LOCK_binlog_end_pos);
mysql_cond_destroy(&update_cond);
mysql_cond_destroy(&COND_queue_busy);
mysql_cond_destroy(&COND_xid_list);
@@ -3292,6 +3269,9 @@ void MYSQL_BIN_LOG::init_pthread_objects()
&COND_binlog_background_thread, 0);
mysql_cond_init(key_BINLOG_COND_binlog_background_thread_end,
&COND_binlog_background_thread_end, 0);
+
+ mysql_mutex_init(m_key_LOCK_binlog_end_pos, &LOCK_binlog_end_pos,
+ MY_MUTEX_INIT_SLOW);
}
@@ -3378,6 +3358,7 @@ bool MYSQL_BIN_LOG::open_index_file(const char *index_file_name_arg,
bool MYSQL_BIN_LOG::open(const char *log_name,
enum_log_type log_type_arg,
const char *new_name,
+ ulong next_log_number,
enum cache_type io_cache_type_arg,
ulong max_size_arg,
bool null_created_arg,
@@ -3404,8 +3385,9 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
DBUG_RETURN(1);
}
- if (init_and_set_log_file_name(log_name, new_name, log_type_arg,
- io_cache_type_arg))
+ /* We need to calculate new log file name for purge to delete old */
+ if (init_and_set_log_file_name(log_name, new_name, next_log_number,
+ log_type_arg, io_cache_type_arg))
{
sql_print_error("MYSQL_BIN_LOG::open failed to generate new file name.");
DBUG_RETURN(1);
@@ -3418,13 +3400,15 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
DBUG_EVALUATE_IF("fault_injection_registering_index", 1, 0))
{
/**
- TODO: although this was introduced to appease valgrind
- when injecting emulated faults using fault_injection_registering_index
- it may be good to consider what actually happens when
- open_purge_index_file succeeds but register or sync fails.
-
- Perhaps we might need the code below in MYSQL_BIN_LOG::cleanup
- for "real life" purposes as well?
+ TODO:
+ Although this was introduced to appease valgrind when
+ injecting emulated faults using
+ fault_injection_registering_index it may be good to consider
+ what actually happens when open_purge_index_file succeeds but
+ register or sync fails.
+
+ Perhaps we might need the code below in MYSQL_LOG_BIN::cleanup
+ for "real life" purposes as well?
*/
DBUG_EXECUTE_IF("fault_injection_registering_index", {
if (my_b_inited(&purge_index_file))
@@ -3447,7 +3431,9 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
#ifdef HAVE_PSI_INTERFACE
m_key_file_log,
#endif
- log_name, log_type_arg, new_name, io_cache_type_arg))
+ log_name,
+ LOG_UNKNOWN, /* Don't generate new name */
+ 0, 0, io_cache_type_arg))
{
#ifdef HAVE_REPLICATION
close_purge_index_file();
@@ -3491,25 +3477,52 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
*/
if (io_cache_type == WRITE_CACHE)
s.flags |= LOG_EVENT_BINLOG_IN_USE_F;
- s.checksum_alg= is_relay_log ?
- /* relay-log */
- /* inherit master's A descriptor if one has been received */
- (relay_log_checksum_alg=
- (relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
- relay_log_checksum_alg :
- /* otherwise use slave's local preference of RL events verification */
- (opt_slave_sql_verify_checksum == 0) ?
- (uint8) BINLOG_CHECKSUM_ALG_OFF : (uint8) binlog_checksum_options):
- /* binlog */
- (uint8) binlog_checksum_options;
+
+ if (is_relay_log)
+ {
+ if (relay_log_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF)
+ relay_log_checksum_alg=
+ opt_slave_sql_verify_checksum ? (enum_binlog_checksum_alg) binlog_checksum_options
+ : BINLOG_CHECKSUM_ALG_OFF;
+ s.checksum_alg= relay_log_checksum_alg;
+ }
+ else
+ s.checksum_alg= (enum_binlog_checksum_alg)binlog_checksum_options;
+
+ crypto.scheme = 0;
DBUG_ASSERT(s.checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
if (!s.is_valid())
goto err;
s.dont_set_created= null_created_arg;
- if (s.write(&log_file))
+ if (write_event(&s))
goto err;
bytes_written+= s.data_written;
+ if (encrypt_binlog)
+ {
+ uint key_version= encryption_key_get_latest_version(ENCRYPTION_KEY_SYSTEM_DATA);
+ if (key_version == ENCRYPTION_KEY_VERSION_INVALID)
+ {
+ sql_print_error("Failed to enable encryption of binary logs");
+ goto err;
+ }
+
+ if (key_version != ENCRYPTION_KEY_NOT_ENCRYPTED)
+ {
+ if (my_random_bytes(crypto.nonce, sizeof(crypto.nonce)))
+ goto err;
+
+ Start_encryption_log_event sele(1, key_version, crypto.nonce);
+ sele.checksum_alg= s.checksum_alg;
+ if (write_event(&sele))
+ goto err;
+
+ // Start_encryption_log_event is written, enable the encryption
+ if (crypto.init(sele.crypto_scheme, key_version))
+ goto err;
+ }
+ }
+
if (!is_relay_log)
{
char buf[FN_REFLEN];
@@ -3549,7 +3562,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
*/
Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state, 0);
- if (gl_ev.write(&log_file))
+ if (write_event(&gl_ev))
goto err;
/* Output a binlog checkpoint event at the start of the binlog file. */
@@ -3600,7 +3613,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
flush_io_cache(&log_file);
mysql_file_sync(log_file.file, MYF(MY_WME));
DBUG_SUICIDE(););
- if (ev.write(&log_file))
+ if (write_event(&ev))
goto err;
bytes_written+= ev.data_written;
}
@@ -3632,17 +3645,26 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
/* Don't set log_pos in event header */
description_event_for_queue->set_artificial_event();
- if (description_event_for_queue->write(&log_file))
+ if (write_event(description_event_for_queue))
goto err;
bytes_written+= description_event_for_queue->data_written;
}
if (flush_io_cache(&log_file) ||
mysql_file_sync(log_file.file, MYF(MY_WME|MY_SYNC_FILESIZE)))
goto err;
- mysql_mutex_lock(&LOCK_commit_ordered);
- strmake_buf(last_commit_pos_file, log_file_name);
- last_commit_pos_offset= my_b_tell(&log_file);
- mysql_mutex_unlock(&LOCK_commit_ordered);
+
+ my_off_t offset= my_b_tell(&log_file);
+
+ if (!is_relay_log)
+ {
+ /* update binlog_end_pos so that it can be read by after sync hook */
+ reset_binlog_end_pos(log_file_name, offset);
+
+ mysql_mutex_lock(&LOCK_commit_ordered);
+ strmake_buf(last_commit_pos_file, log_file_name);
+ last_commit_pos_offset= offset;
+ mysql_mutex_unlock(&LOCK_commit_ordered);
+ }
if (write_file_name_to_index_file)
{
@@ -3687,9 +3709,13 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
/* Remove any initial entries with no pending XIDs. */
while ((b= binlog_xid_count_list.head()) && b->xid_count == 0)
{
+ WSREP_XID_LIST_ENTRY("MYSQL_BIN_LOG::open(): Removing xid_list_entry for "
+ "%s (%lu)", b);
my_free(binlog_xid_count_list.get());
}
mysql_cond_broadcast(&COND_xid_list);
+ WSREP_XID_LIST_ENTRY("MYSQL_BIN_LOG::open(): Adding new xid_list_entry for "
+ "%s (%lu)", new_xid_list_entry);
binlog_xid_count_list.push_back(new_xid_list_entry);
mysql_mutex_unlock(&LOCK_xid_list);
@@ -3750,6 +3776,7 @@ int MYSQL_BIN_LOG::get_current_log(LOG_INFO* linfo)
int MYSQL_BIN_LOG::raw_get_current_log(LOG_INFO* linfo)
{
+ mysql_mutex_assert_owner(&LOCK_log);
strmake_buf(linfo->log_file_name, log_file_name);
linfo->pos = my_b_tell(&log_file);
return 0;
@@ -3882,7 +3909,10 @@ int MYSQL_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name,
error= !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO;
break;
}
-
+ if (fname[length-1] != '\n')
+ continue; // Not a log entry
+ fname[length-1]= 0; // Remove end \n
+
// extend relative paths and match against full path
if (normalize_binlog_name(full_fname, fname, is_relay_log))
{
@@ -3893,11 +3923,10 @@ int MYSQL_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name,
// if the log entry matches, null string matching anything
if (!log_name ||
- (log_name_len == fname_len-1 && full_fname[log_name_len] == '\n' &&
+ (log_name_len == fname_len &&
!memcmp(full_fname, full_log_name, log_name_len)))
{
DBUG_PRINT("info", ("Found log file entry"));
- full_fname[fname_len-1]= 0; // remove last \n
linfo->index_file_start_offset= offset;
linfo->index_file_offset = my_b_tell(&index_file);
break;
@@ -3982,8 +4011,10 @@ err:
The new index file will only contain this file.
- @param thd Thread
- @param create_new_log 1 if we should start writing to a new log file
+ @param thd Thread id. This can be zero in case of resetting
+ relay logs
+ @param create_new_log 1 if we should start writing to a new log file
+ @param next_log_number min number of next log file to use, if possible.
@note
If not called from slave thread, write start event to new log
@@ -3994,8 +4025,9 @@ err:
1 error
*/
-bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log,
- rpl_gtid *init_state, uint32 init_state_len)
+bool MYSQL_BIN_LOG::reset_logs(THD *thd, bool create_new_log,
+ rpl_gtid *init_state, uint32 init_state_len,
+ ulong next_log_number)
{
LOG_INFO linfo;
bool error=0;
@@ -4028,9 +4060,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log,
mysql_mutex_unlock(&LOCK_xid_list);
}
- DEBUG_SYNC(thd, "reset_logs_after_set_reset_master_pending");
- if (thd)
- ha_reset_logs(thd);
+ DEBUG_SYNC_C_IF_THD(thd, "reset_logs_after_set_reset_master_pending");
/*
We need to get both locks to be sure that no one is trying to
write to the index log file.
@@ -4045,7 +4075,8 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log,
Without binlog, we cannot XA recover prepared-but-not-committed
transactions in engines. So force a commit checkpoint first.
- Note that we take and immediately release LOCK_commit_ordered. This has
+ Note that we take and immediately
+ release LOCK_after_binlog_sync/LOCK_commit_ordered. This has
the effect to ensure that any on-going group commit (in
trx_group_commit_leader()) has completed before we request the checkpoint,
due to the chaining of LOCK_log and LOCK_commit_ordered in that function.
@@ -4056,7 +4087,10 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log,
commit_ordered() in the engine of some transaction, and then a crash
later would leave such transaction not recoverable.
*/
+
+ mysql_mutex_lock(&LOCK_after_binlog_sync);
mysql_mutex_lock(&LOCK_commit_ordered);
+ mysql_mutex_unlock(&LOCK_after_binlog_sync);
mysql_mutex_unlock(&LOCK_commit_ordered);
mark_xids_active(current_binlog_id, 1);
@@ -4109,7 +4143,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log,
{
uint errcode= purge_log_get_error_code(err);
sql_print_error("Failed to locate old binlog or relay log files");
- my_message(errcode, ER(errcode), MYF(0));
+ my_message(errcode, ER_THD_OR_DEFAULT(thd, errcode), MYF(0));
error= 1;
goto err;
}
@@ -4120,9 +4154,12 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log,
{
if (my_errno == ENOENT)
{
- push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
- ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
- linfo.log_file_name);
+ if (thd)
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
+ ER_LOG_PURGE_NO_FILE,
+ ER_THD(thd, ER_LOG_PURGE_NO_FILE),
+ linfo.log_file_name);
+
sql_print_information("Failed to delete file '%s'",
linfo.log_file_name);
my_errno= 0;
@@ -4130,13 +4167,14 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log,
}
else
{
- push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
- ER_BINLOG_PURGE_FATAL_ERR,
- "a problem with deleting %s; "
- "consider examining correspondence "
- "of your binlog index file "
- "to the actual binlog files",
- linfo.log_file_name);
+ if (thd)
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
+ ER_BINLOG_PURGE_FATAL_ERR,
+ "a problem with deleting %s; "
+ "consider examining correspondence "
+ "of your binlog index file "
+ "to the actual binlog files",
+ linfo.log_file_name);
error= 1;
goto err;
}
@@ -4159,9 +4197,11 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log,
{
if (my_errno == ENOENT)
{
- push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
- ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
- index_file_name);
+ if (thd)
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
+ ER_LOG_PURGE_NO_FILE,
+ ER_THD(thd, ER_LOG_PURGE_NO_FILE),
+ index_file_name);
sql_print_information("Failed to delete file '%s'",
index_file_name);
my_errno= 0;
@@ -4169,19 +4209,21 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log,
}
else
{
- push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
- ER_BINLOG_PURGE_FATAL_ERR,
- "a problem with deleting %s; "
- "consider examining correspondence "
- "of your binlog index file "
- "to the actual binlog files",
- index_file_name);
+ if (thd)
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
+ ER_BINLOG_PURGE_FATAL_ERR,
+ "a problem with deleting %s; "
+ "consider examining correspondence "
+ "of your binlog index file "
+ "to the actual binlog files",
+ index_file_name);
error= 1;
goto err;
}
}
if (create_new_log && !open_index_file(index_file_name, 0, FALSE))
- if ((error= open(save_name, log_type, 0, io_cache_type, max_size, 0, FALSE)))
+ if ((error= open(save_name, log_type, 0, next_log_number,
+ io_cache_type, max_size, 0, FALSE)))
goto err;
my_free((void *) save_name);
@@ -4208,6 +4250,8 @@ err:
if (b->binlog_id == current_binlog_id)
break;
DBUG_ASSERT(b->xid_count == 0);
+ WSREP_XID_LIST_ENTRY("MYSQL_BIN_LOG::reset_logs(): Removing "
+ "xid_list_entry for %s (%lu)", b);
my_free(binlog_xid_count_list.get());
}
mysql_cond_broadcast(&COND_xid_list);
@@ -4325,12 +4369,9 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
if((error=find_log_pos(&rli->linfo, rli->event_relay_log_name, 0)) ||
(error=find_next_log(&rli->linfo, 0)))
{
- char buff[22];
- sql_print_error("next log error: %d offset: %s log: %s included: %d",
- error,
- llstr(rli->linfo.index_file_offset,buff),
- rli->event_relay_log_name,
- included);
+ sql_print_error("next log error: %d offset: %llu log: %s included: %d",
+ error, rli->linfo.index_file_offset,
+ rli->event_relay_log_name, included);
goto err;
}
@@ -4372,14 +4413,9 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
*/
if ((errcode= find_log_pos(&rli->linfo, rli->event_relay_log_name, 0)))
{
- char buff[22];
- if (!error)
- error= errcode;
- sql_print_error("next log error: %d offset: %s log: %s included: %d",
- errcode,
- llstr(rli->linfo.index_file_offset,buff),
- rli->group_relay_log_name,
- included);
+ sql_print_error("next log error: %d offset: %llu log: %s included: %d",
+ errcode, rli->linfo.index_file_offset,
+ rli->group_relay_log_name, included);
goto err;
}
@@ -4655,7 +4691,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space,
if (thd)
{
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
+ ER_LOG_PURGE_NO_FILE, ER_THD(thd, ER_LOG_PURGE_NO_FILE),
log_info.log_file_name);
}
sql_print_information("Failed to execute mysql_file_stat on file '%s'",
@@ -4713,13 +4749,6 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space,
}
error= 0;
- if (!need_mutex)
- {
- /*
- This is to avoid triggering an error in NDB.
- */
- ha_binlog_index_purge_file(current_thd, log_info.log_file_name);
- }
DBUG_PRINT("info",("purging %s",log_info.log_file_name));
if (!my_delete(log_info.log_file_name, MYF(0)))
@@ -4734,7 +4763,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space,
if (thd)
{
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
- ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE),
+ ER_LOG_PURGE_NO_FILE, ER_THD(thd, ER_LOG_PURGE_NO_FILE),
log_info.log_file_name);
}
sql_print_information("Failed to delete file '%s'",
@@ -4806,7 +4835,6 @@ int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time)
LOG_INFO log_info;
MY_STAT stat_area;
THD *thd= current_thd;
-
DBUG_ENTER("purge_logs_before_date");
mysql_mutex_lock(&LOCK_index);
@@ -4872,24 +4900,24 @@ err:
bool
-MYSQL_BIN_LOG::can_purge_log(const char *log_file_name)
+MYSQL_BIN_LOG::can_purge_log(const char *log_file_name_arg)
{
xid_count_per_binlog *b;
- if (is_active(log_file_name))
+ if (is_active(log_file_name_arg))
return false;
mysql_mutex_lock(&LOCK_xid_list);
{
I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list);
while ((b= it++) &&
- 0 != strncmp(log_file_name+dirname_length(log_file_name),
+ 0 != strncmp(log_file_name_arg+dirname_length(log_file_name_arg),
b->binlog_name, b->binlog_name_len))
;
}
mysql_mutex_unlock(&LOCK_xid_list);
if (b)
return false;
- return !log_in_use(log_file_name);
+ return !log_in_use(log_file_name_arg);
}
#endif /* HAVE_REPLICATION */
@@ -4945,6 +4973,20 @@ void MYSQL_BIN_LOG::make_log_name(char* buf, const char* log_ident)
bool MYSQL_BIN_LOG::is_active(const char *log_file_name_arg)
{
+ /**
+ * there should/must be mysql_mutex_assert_owner(&LOCK_log) here...
+ * but code violates this! (scary monsters and super creeps!)
+ *
+ * example stacktrace:
+ * #8 MYSQL_BIN_LOG::is_active
+ * #9 MYSQL_BIN_LOG::can_purge_log
+ * #10 MYSQL_BIN_LOG::purge_logs
+ * #11 MYSQL_BIN_LOG::purge_first_log
+ * #12 next_event
+ * #13 exec_relay_log_event
+ *
+ * I didn't investigate if this is ligit...(i.e if my comment is wrong)
+ */
return !strcmp(log_file_name, log_file_name_arg);
}
@@ -4993,8 +5035,7 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
char new_name[FN_REFLEN], *new_name_ptr, *old_name, *file_to_open;
uint close_flag;
bool delay_close= false;
- File old_file;
- LINT_INIT(old_file);
+ File UNINIT_VAR(old_file);
DBUG_ENTER("MYSQL_BIN_LOG::new_file_impl");
if (need_lock)
@@ -5018,7 +5059,7 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
We have to do this here and not in open as we want to store the
new file name in the current binary log file.
*/
- if ((error= generate_new_name(new_name, name)))
+ if ((error= generate_new_name(new_name, name, 0)))
goto end;
new_name_ptr=new_name;
@@ -5039,11 +5080,13 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
r.checksum_alg= relay_log_checksum_alg;
DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
if(DBUG_EVALUATE_IF("fault_injection_new_file_rotate_event", (error=close_on_error=TRUE), FALSE) ||
- (error= r.write(&log_file)))
+ (error= write_event(&r)))
{
DBUG_EXECUTE_IF("fault_injection_new_file_rotate_event", errno=2;);
close_on_error= TRUE;
- my_printf_error(ER_ERROR_ON_WRITE, ER(ER_CANT_OPEN_FILE), MYF(ME_FATALERROR), name, errno);
+ my_printf_error(ER_ERROR_ON_WRITE,
+ ER_THD_OR_DEFAULT(current_thd, ER_CANT_OPEN_FILE),
+ MYF(ME_FATALERROR), name, errno);
goto end;
}
bytes_written += r.data_written;
@@ -5078,14 +5121,15 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
binlog_checksum_options= checksum_alg_reset;
}
/*
- Note that at this point, log_state != LOG_CLOSED (important for is_open()).
+ Note that at this point, log_state != LOG_CLOSED
+ (important for is_open()).
*/
/*
new_file() is only used for rotation (in FLUSH LOGS or because size >
max_binlog_size or max_relay_log_size).
- If this is a binary log, the Format_description_log_event at the beginning of
- the new file should have created=0 (to distinguish with the
+ If this is a binary log, the Format_description_log_event at the
+ beginning of the new file should have created=0 (to distinguish with the
Format_description_log_event written at server startup, which should
trigger temp tables deletion on slaves.
*/
@@ -5097,14 +5141,15 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
{
/* reopen the binary log file. */
file_to_open= new_name_ptr;
- error= open(old_name, log_type, new_name_ptr, io_cache_type,
+ error= open(old_name, log_type, new_name_ptr, 0, io_cache_type,
max_size, 1, FALSE);
}
/* handle reopening errors */
if (error)
{
- my_printf_error(ER_CANT_OPEN_FILE, ER(ER_CANT_OPEN_FILE),
+ my_printf_error(ER_CANT_OPEN_FILE,
+ ER_THD_OR_DEFAULT(current_thd, ER_CANT_OPEN_FILE),
MYF(ME_FATALERROR), file_to_open, error);
close_on_error= TRUE;
}
@@ -5149,9 +5194,16 @@ end:
DBUG_RETURN(error);
}
+bool MYSQL_BIN_LOG::write_event(Log_event *ev, IO_CACHE *file)
+{
+ Log_event_writer writer(file, &crypto);
+ if (crypto.scheme && file == &log_file)
+ writer.ctx= alloca(crypto.ctx_size);
-bool
-MYSQL_BIN_LOG::append(Log_event *ev)
+ return writer.write(ev);
+}
+
+bool MYSQL_BIN_LOG::append(Log_event *ev)
{
bool res;
mysql_mutex_lock(&LOCK_log);
@@ -5168,11 +5220,8 @@ bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev)
mysql_mutex_assert_owner(&LOCK_log);
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
- /*
- Log_event::write() is smart enough to use my_b_write() or
- my_b_append() depending on the kind of cache we have.
- */
- if (ev->write(&log_file))
+
+ if (write_event(ev))
{
error=1;
goto err;
@@ -5188,32 +5237,62 @@ err:
DBUG_RETURN(error);
}
-
-bool MYSQL_BIN_LOG::appendv(const char* buf, uint len,...)
+bool MYSQL_BIN_LOG::write_event_buffer(uchar* buf, uint len)
{
- bool error= 0;
- DBUG_ENTER("MYSQL_BIN_LOG::appendv");
- va_list(args);
- va_start(args,len);
+ bool error= 1;
+ uchar *ebuf= 0;
+ DBUG_ENTER("MYSQL_BIN_LOG::write_event_buffer");
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
mysql_mutex_assert_owner(&LOCK_log);
- do
+
+ if (crypto.scheme != 0)
{
- if (my_b_append(&log_file,(uchar*) buf,len))
- {
- error= 1;
+ DBUG_ASSERT(crypto.scheme == 1);
+
+ uint elen;
+ uchar iv[BINLOG_IV_LENGTH];
+
+ ebuf= (uchar*)my_safe_alloca(len);
+ if (!ebuf)
goto err;
- }
- bytes_written += len;
- } while ((buf=va_arg(args,const char*)) && (len=va_arg(args,uint)));
+
+ crypto.set_iv(iv, my_b_append_tell(&log_file));
+
+ /*
+ we want to encrypt everything, excluding the event length:
+ massage the data before the encryption
+ */
+ memcpy(buf + EVENT_LEN_OFFSET, buf, 4);
+
+ if (encryption_crypt(buf + 4, len - 4,
+ ebuf + 4, &elen,
+ crypto.key, crypto.key_length, iv, sizeof(iv),
+ ENCRYPTION_FLAG_ENCRYPT | ENCRYPTION_FLAG_NOPAD,
+ ENCRYPTION_KEY_SYSTEM_DATA, crypto.key_version))
+ goto err;
+
+ DBUG_ASSERT(elen == len - 4);
+
+ /* massage the data after the encryption */
+ memcpy(ebuf, ebuf + EVENT_LEN_OFFSET, 4);
+ int4store(ebuf + EVENT_LEN_OFFSET, len);
+
+ buf= ebuf;
+ }
+ if (my_b_append(&log_file, buf, len))
+ goto err;
+ bytes_written+= len;
+
+ error= 0;
DBUG_PRINT("info",("max_size: %lu",max_size));
if (flush_and_sync(0))
goto err;
if (my_b_append_tell(&log_file) > max_size)
error= new_file_without_locking();
err:
+ my_safe_afree(ebuf, len);
if (!error)
signal_update();
DBUG_RETURN(error);
@@ -5424,7 +5503,6 @@ binlog_cache_mngr *THD::binlog_setup_trx_data()
DBUG_RETURN(cache_mngr);
}
-
/*
Function to start a statement and optionally a transaction for the
binary log.
@@ -5508,6 +5586,7 @@ binlog_start_consistent_snapshot(handlerton *hton, THD *thd)
binlog_cache_mngr *const cache_mngr= thd->binlog_setup_trx_data();
/* Server layer calls us with LOCK_commit_ordered locked, so this is safe. */
+ mysql_mutex_assert_owner(&LOCK_commit_ordered);
strmake_buf(cache_mngr->last_commit_pos_file, mysql_bin_log.last_commit_pos_file);
cache_mngr->last_commit_pos_offset= mysql_bin_log.last_commit_pos_offset;
@@ -5545,12 +5624,8 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
is_transactional= 1;
/* Pre-conditions */
-#ifdef WITH_WSREP
- DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
- (WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open()));
-#else
- DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
-#endif
+ DBUG_ASSERT(is_current_stmt_binlog_format_row());
+ DBUG_ASSERT(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open());
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
Table_map_log_event
@@ -5564,6 +5639,7 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
IO_CACHE *file=
cache_mngr->get_binlog_cache_log(use_trans_cache(this, is_transactional));
+ Log_event_writer writer(file);
binlog_cache_data *cache_data=
cache_mngr->get_binlog_cache_data(use_trans_cache(this, is_transactional));
@@ -5572,14 +5648,14 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
Annotate_rows_log_event anno(table->in_use, is_transactional, false);
/* Annotate event should be written not more than once */
*with_annotate= 0;
- if ((error= anno.write(file)))
+ if ((error= writer.write(&anno)))
{
if (my_errno == EFBIG)
cache_data->set_incident();
DBUG_RETURN(error);
}
}
- if ((error= the_event.write(file)))
+ if ((error= writer.write(&the_event)))
DBUG_RETURN(error);
binlog_table_maps++;
@@ -5690,11 +5766,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
bool is_transactional)
{
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
-#ifdef WITH_WSREP
DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open());
-#else
- DBUG_ASSERT(mysql_bin_log.is_open());
-#endif
DBUG_PRINT("enter", ("event: 0x%lx", (long) event));
int error= 0;
@@ -5710,14 +5782,14 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
if (Rows_log_event* pending= cache_data->pending())
{
- IO_CACHE *file= &cache_data->cache_log;
+ Log_event_writer writer(&cache_data->cache_log);
/*
Write pending event to the cache.
*/
DBUG_EXECUTE_IF("simulate_disk_full_at_flush_pending",
{DBUG_SET("+d,simulate_file_write_error");});
- if (pending->write(file))
+ if (writer.write(pending))
{
set_write_error(thd, is_transactional);
if (check_write_error(thd) && cache_data &&
@@ -5746,13 +5818,26 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
bool is_transactional, uint64 commit_id)
{
rpl_gtid gtid;
- uint32 domain_id= thd->variables.gtid_domain_id;
- uint32 server_id= thd->variables.server_id;
- uint64 seq_no= thd->variables.gtid_seq_no;
+ uint32 domain_id;
+ uint32 local_server_id;
+ uint64 seq_no;
int err;
DBUG_ENTER("write_gtid_event");
DBUG_PRINT("enter", ("standalone: %d", standalone));
-
+
+#ifdef WITH_WSREP
+ if (WSREP(thd) && thd->wsrep_trx_meta.gtid.seqno != -1 && wsrep_gtid_mode)
+ {
+ domain_id= wsrep_gtid_domain_id;
+ } else {
+#endif /* WITH_WSREP */
+ domain_id= thd->variables.gtid_domain_id;
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
+ local_server_id= thd->variables.server_id;
+ seq_no= thd->variables.gtid_seq_no;
+
if (thd->variables.option_bits & OPTION_GTID_BEGIN)
{
DBUG_PRINT("error", ("OPTION_GTID_BEGIN is set. "
@@ -5770,7 +5855,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
{
/* Use the specified sequence number. */
gtid.domain_id= domain_id;
- gtid.server_id= server_id;
+ gtid.server_id= local_server_id;
gtid.seq_no= seq_no;
err= rpl_global_gtid_binlog_state.update(&gtid, opt_gtid_strict_mode);
if (err && thd->get_stmt_da()->sql_errno()==ER_GTID_STRICT_OUT_OF_ORDER)
@@ -5780,7 +5865,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
{
/* Allocate the next sequence number for the GTID. */
err= rpl_global_gtid_binlog_state.update_with_next_gtid(domain_id,
- server_id, &gtid);
+ local_server_id, &gtid);
seq_no= gtid.seq_no;
}
if (err)
@@ -5792,7 +5877,8 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
commit_id);
/* Write the event to the binary log. */
- if (gtid_event.write(&mysql_bin_log.log_file))
+ DBUG_ASSERT(this == &mysql_bin_log);
+ if (write_event(&gtid_event))
DBUG_RETURN(true);
status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written);
@@ -5808,7 +5894,7 @@ MYSQL_BIN_LOG::write_state_to_file()
char buf[FN_REFLEN];
int err;
bool opened= false;
- bool inited= false;
+ bool log_inited= false;
fn_format(buf, opt_bin_logname, mysql_data_home, ".state",
MY_UNPACK_FILENAME);
@@ -5823,10 +5909,10 @@ MYSQL_BIN_LOG::write_state_to_file()
if ((err= init_io_cache(&cache, file_no, IO_SIZE, WRITE_CACHE, 0, 0,
MYF(MY_WME|MY_WAIT_IF_FULL))))
goto err;
- inited= true;
+ log_inited= true;
if ((err= rpl_global_gtid_binlog_state.write_to_iocache(&cache)))
goto err;
- inited= false;
+ log_inited= false;
if ((err= end_io_cache(&cache)))
goto err;
if ((err= mysql_file_sync(file_no, MYF(MY_WME|MY_SYNC_FILESIZE))))
@@ -5835,7 +5921,7 @@ MYSQL_BIN_LOG::write_state_to_file()
err:
sql_print_error("Error writing binlog state to file '%s'.\n", buf);
- if (inited)
+ if (log_inited)
end_io_cache(&cache);
end:
if (opened)
@@ -5861,7 +5947,7 @@ MYSQL_BIN_LOG::read_state_from_file()
char buf[FN_REFLEN];
int err;
bool opened= false;
- bool inited= false;
+ bool log_inited= false;
fn_format(buf, opt_bin_logname, mysql_data_home, ".state",
MY_UNPACK_FILENAME);
@@ -5888,7 +5974,7 @@ MYSQL_BIN_LOG::read_state_from_file()
if ((err= init_io_cache(&cache, file_no, IO_SIZE, READ_CACHE, 0, 0,
MYF(MY_WME|MY_WAIT_IF_FULL))))
goto err;
- inited= true;
+ log_inited= true;
if ((err= rpl_global_gtid_binlog_state.read_from_iocache(&cache)))
goto err;
goto end;
@@ -5896,7 +5982,7 @@ MYSQL_BIN_LOG::read_state_from_file()
err:
sql_print_error("Error reading binlog GTID state from file '%s'.\n", buf);
end:
- if (inited)
+ if (log_inited)
end_io_cache(&cache);
if (opened)
mysql_file_close(file_no, MYF(0));
@@ -5934,11 +6020,11 @@ MYSQL_BIN_LOG::is_empty_state()
bool
-MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id,
+MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id_arg,
rpl_gtid *out_gtid)
{
rpl_gtid *gtid;
- if ((gtid= rpl_global_gtid_binlog_state.find(domain_id, server_id)))
+ if ((gtid= rpl_global_gtid_binlog_state.find(domain_id, server_id_arg)))
*out_gtid= *gtid;
return gtid != NULL;
}
@@ -5968,11 +6054,13 @@ MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint32 domain_id, uint64 seq_no)
bool
-MYSQL_BIN_LOG::check_strict_gtid_sequence(uint32 domain_id, uint32 server_id,
+MYSQL_BIN_LOG::check_strict_gtid_sequence(uint32 domain_id,
+ uint32 server_id_arg,
uint64 seq_no)
{
return rpl_global_gtid_binlog_state.check_strict_sequence(domain_id,
- server_id, seq_no);
+ server_id_arg,
+ seq_no);
}
@@ -5989,22 +6077,19 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
binlog_cache_data *cache_data= 0;
bool is_trans_cache= FALSE;
bool using_trans= event_info->use_trans_cache();
- bool direct;
- ulong prev_binlog_id;
+ bool direct= event_info->use_direct_logging();
+ ulong UNINIT_VAR(prev_binlog_id);
DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)");
- LINT_INIT(prev_binlog_id);
-#ifdef WITH_WSREP
/*
When binary logging is not enabled (--log-bin=0), wsrep-patch partially
- enables it without opening the binlog file (MSQL_BIN_LOG::open().
- So, avoid writing directly to binlog file.
+ enables it without opening the binlog file (MYSQL_BIN_LOG::open().
+ So, avoid writing to binlog file.
*/
- if (wsrep_emulate_bin_log)
- direct= false;
- else
-#endif /* WITH_WSREP */
- direct= event_info->use_direct_logging();
+ if (direct &&
+ (wsrep_emulate_bin_log ||
+ (WSREP(thd) && !(thd->variables.option_bits & OPTION_BIN_LOG))))
+ DBUG_RETURN(0);
if (thd->variables.option_bits & OPTION_GTID_BEGIN)
{
@@ -6028,10 +6113,17 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
/*
We only end the statement if we are in a top-level statement. If
we are inside a stored function, we do not end the statement since
- this will close all tables on the slave.
+ this will close all tables on the slave. But there can be a special case
+ where we are inside a stored function/trigger and a SAVEPOINT is being
+ set in side the stored function/trigger. This SAVEPOINT execution will
+ force the pending event to be flushed without an STMT_END_F flag. This
+ will result in a case where following DMLs will be considered as part of
+ same statement and result in data loss on slave. Hence in this case we
+ force the end_stmt to be true.
*/
- bool const end_stmt=
- thd->locked_tables_mode && thd->lex->requires_prelocking();
+ bool const end_stmt= (thd->in_sub_stmt && thd->lex->sql_command ==
+ SQLCOM_SAVEPOINT) ? true :
+ (thd->locked_tables_mode && thd->lex->requires_prelocking());
if (thd->binlog_flush_pending_rows_event(end_stmt, using_trans))
DBUG_RETURN(error);
@@ -6040,13 +6132,9 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
mostly called if is_open() *was* true a few instructions before, but it
could have changed since.
*/
-#ifdef WITH_WSREP
/* applier and replayer can skip writing binlog events */
- if ((WSREP_EMULATE_BINLOG(thd) && (thd->wsrep_exec_mode != REPL_RECV)) ||
- is_open())
-#else
- if (likely(is_open()))
-#endif
+ if ((WSREP_EMULATE_BINLOG(thd) &&
+ IF_WSREP(thd->wsrep_exec_mode != REPL_RECV, 0)) || is_open())
{
my_off_t UNINIT_VAR(my_org_b_tell);
#ifdef HAVE_REPLICATION
@@ -6056,7 +6144,17 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
binlog_[wild_]{do|ignore}_table?" (WL#1049)"
*/
const char *local_db= event_info->get_db();
- if ((!(thd->variables.option_bits & OPTION_BIN_LOG)) ||
+
+ bool option_bin_log_flag= (thd->variables.option_bits & OPTION_BIN_LOG);
+
+ /*
+ Log all updates to binlog cache so that they can get replicated to other
+ nodes. A check has been added to stop them from getting logged into
+ binary log files.
+ */
+ if (WSREP(thd)) option_bin_log_flag= true;
+
+ if ((!(option_bin_log_flag)) ||
(thd->lex->sql_command != SQLCOM_ROLLBACK_TO_SAVEPOINT &&
thd->lex->sql_command != SQLCOM_SAVEPOINT &&
!binlog_filter->db_ok(local_db)))
@@ -6078,11 +6176,12 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
prev_binlog_id= current_binlog_id;
DBUG_EXECUTE_IF("binlog_force_commit_id",
{
- const LEX_STRING name= { C_STRING_WITH_LEN("commit_id") };
+ const LEX_STRING commit_name= { C_STRING_WITH_LEN("commit_id") };
bool null_value;
user_var_entry *entry=
(user_var_entry*) my_hash_search(&thd->user_vars,
- (uchar*) name.str, name.length);
+ (uchar*) commit_name.str,
+ commit_name.length);
commit_id= entry->val_int(&null_value);
});
if (write_gtid_event(thd, true, using_trans, commit_id))
@@ -6120,7 +6219,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
Annotate_rows_log_event anno(thd, using_trans, direct);
/* Annotate event should be written not more than once */
*with_annotate= 0;
- if (anno.write(file))
+ if (write_event(&anno, file))
goto err;
}
@@ -6134,7 +6233,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT,
thd->first_successful_insert_id_in_prev_stmt_for_binlog,
using_trans, direct);
- if (e.write(file))
+ if (write_event(&e, file))
goto err;
}
if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0)
@@ -6145,14 +6244,14 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
Intvar_log_event e(thd, (uchar) INSERT_ID_EVENT,
thd->auto_inc_intervals_in_cur_stmt_for_binlog.
minimum(), using_trans, direct);
- if (e.write(file))
+ if (write_event(&e, file))
goto err;
}
if (thd->rand_used)
{
Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2,
using_trans, direct);
- if (e.write(file))
+ if (write_event(&e, file))
goto err;
}
if (thd->user_var_events.elements)
@@ -6176,7 +6275,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
flags,
using_trans,
direct);
- if (e.write(file))
+ if (write_event(&e, file))
goto err;
}
}
@@ -6186,7 +6285,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
/*
Write the event.
*/
- if (event_info->write(file) ||
+ if (write_event(event_info, file) ||
DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0))
goto err;
@@ -6204,30 +6303,66 @@ err:
if ((error= flush_and_sync(&synced)))
{
}
- else if ((error= RUN_HOOK(binlog_storage, after_flush,
- (thd, log_file_name, file->pos_in_file, synced))))
- {
- sql_print_error("Failed to run 'after_flush' hooks");
- }
else
{
- signal_update();
- if ((error= rotate(false, &check_purge)))
- check_purge= false;
+ mysql_mutex_assert_not_owner(&LOCK_prepare_ordered);
+ mysql_mutex_assert_owner(&LOCK_log);
+ mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
+ mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
+ bool first= true;
+ bool last= true;
+ if ((error= RUN_HOOK(binlog_storage, after_flush,
+ (thd, log_file_name, file->pos_in_file,
+ synced, first, last))))
+ {
+ sql_print_error("Failed to run 'after_flush' hooks");
+ error= 1;
+ }
+ else
+ {
+ /* update binlog_end_pos so it can be read by dump thread
+ *
+ * note: must be _after_ the RUN_HOOK(after_flush) or else
+ * semi-sync-plugin might not have put the transaction into
+ * it's list before dump-thread tries to send it
+ */
+ update_binlog_end_pos(offset);
+
+ signal_update();
+ if ((error= rotate(false, &check_purge)))
+ check_purge= false;
+ }
}
}
status_var_add(thd->status_var.binlog_bytes_written,
offset - my_org_b_tell);
+ mysql_mutex_lock(&LOCK_after_binlog_sync);
+ mysql_mutex_unlock(&LOCK_log);
+
+ mysql_mutex_assert_not_owner(&LOCK_prepare_ordered);
+ mysql_mutex_assert_not_owner(&LOCK_log);
+ mysql_mutex_assert_owner(&LOCK_after_binlog_sync);
+ mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
+ bool first= true;
+ bool last= true;
+ if (RUN_HOOK(binlog_storage, after_sync,
+ (thd, log_file_name, file->pos_in_file,
+ first, last)))
+ {
+ error=1;
+ /* error is already printed inside hook */
+ }
+
/*
Take mutex to protect against a reader seeing partial writes of 64-bit
offset on 32-bit CPUs.
*/
mysql_mutex_lock(&LOCK_commit_ordered);
+ mysql_mutex_unlock(&LOCK_after_binlog_sync);
last_commit_pos_offset= offset;
mysql_mutex_unlock(&LOCK_commit_ordered);
- mysql_mutex_unlock(&LOCK_log);
if (check_purge)
checkpoint_and_purge(prev_binlog_id);
@@ -6397,15 +6532,15 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge)
{
int error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::rotate");
-#ifdef WITH_WSREP
- if (WSREP_ON && wsrep_to_isolation)
- {
- *check_purge= false;
- WSREP_DEBUG("avoiding binlog rotate due to TO isolation: %d",
- wsrep_to_isolation);
- DBUG_RETURN(0);
- }
-#endif
+
+ if (wsrep_to_isolation)
+ {
+ DBUG_ASSERT(WSREP_ON);
+ *check_purge= false;
+ WSREP_DEBUG("avoiding binlog rotate due to TO isolation: %d",
+ wsrep_to_isolation);
+ DBUG_RETURN(0);
+ }
//todo: fix the macro def and restore safe_mutex_assert_owner(&LOCK_log);
*check_purge= false;
@@ -6529,33 +6664,35 @@ uint MYSQL_BIN_LOG::next_file_id()
return res;
}
+class CacheWriter: public Log_event_writer
+{
+public:
+ ulong remains;
-/**
- Calculate checksum of possibly a part of an event containing at least
- the whole common header.
-
- @param buf the pointer to trans cache's buffer
- @param off the offset of the beginning of the event in the buffer
- @param event_len no-checksum length of the event
- @param length the current size of the buffer
-
- @param crc [in-out] the checksum
+ CacheWriter(THD *thd_arg, IO_CACHE *file_arg, bool do_checksum,
+ Binlog_crypt_data *cr)
+ : Log_event_writer(file_arg, cr), remains(0), thd(thd_arg), first(true)
+ { checksum_len= do_checksum ? BINLOG_CHECKSUM_LEN : 0; }
- Event size in incremented by @c BINLOG_CHECKSUM_LEN.
+ ~CacheWriter()
+ { status_var_add(thd->status_var.binlog_bytes_written, bytes_written); }
- @return 0 or number of unprocessed yet bytes of the event excluding
- the checksum part.
-*/
- static ulong fix_log_event_crc(uchar *buf, uint off, uint event_len,
- uint length, ha_checksum *crc)
-{
- ulong ret;
- uchar *event_begin= buf + off;
+ int write(uchar* pos, size_t len)
+ {
+ if (first)
+ write_header(pos, len);
+ else
+ write_data(pos, len);
- ret= length >= off + event_len ? 0 : off + event_len - length;
- *crc= my_checksum(*crc, event_begin, event_len - ret);
- return ret;
-}
+ remains -= len;
+ if ((first= !remains))
+ write_footer();
+ return 0;
+ }
+private:
+ THD *thd;
+ bool first;
+};
/*
Write the contents of a cache to the binary log.
@@ -6576,21 +6713,22 @@ uint MYSQL_BIN_LOG::next_file_id()
int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
{
+ DBUG_ENTER("MYSQL_BIN_LOG::write_cache");
+
mysql_mutex_assert_owner(&LOCK_log);
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
- return ER_ERROR_ON_WRITE;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
- ulong remains= 0; // part of unprocessed yet netto length of the event
long val;
ulong end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t
uchar header[LOG_EVENT_HEADER_LEN];
- ha_checksum crc= 0, crc_0= 0;
- my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF);
- uchar buf[BINLOG_CHECKSUM_LEN];
- DBUG_ENTER("MYSQL_BIN_LOG::write_cache");
+ CacheWriter writer(thd, &log_file, binlog_checksum_options, &crypto);
+
+ if (crypto.scheme)
+ writer.ctx= alloca(crypto.ctx_size);
// while there is just one alg the following must hold:
- DBUG_ASSERT(!do_checksum ||
+ DBUG_ASSERT(binlog_checksum_options == BINLOG_CHECKSUM_ALG_OFF ||
binlog_checksum_options == BINLOG_CHECKSUM_ALG_CRC32);
/*
@@ -6609,7 +6747,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
group= (uint)my_b_tell(&log_file);
hdr_offs= carry= 0;
-
+
do
{
/*
@@ -6619,53 +6757,40 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
if (unlikely(carry > 0))
{
DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN);
+ uint tail= LOG_EVENT_HEADER_LEN - carry;
/* assemble both halves */
- memcpy(&header[carry], (char *)cache->read_pos,
- LOG_EVENT_HEADER_LEN - carry);
+ memcpy(&header[carry], (char *)cache->read_pos, tail);
+
+ ulong len= uint4korr(header + EVENT_LEN_OFFSET);
+ writer.remains= len;
/* fix end_log_pos */
- val= uint4korr(&header[LOG_POS_OFFSET]) + group +
- (end_log_pos_inc+= (do_checksum ? BINLOG_CHECKSUM_LEN : 0));
- int4store(&header[LOG_POS_OFFSET], val);
+ end_log_pos_inc += writer.checksum_len;
+ val= uint4korr(header + LOG_POS_OFFSET) + group + end_log_pos_inc;
+ int4store(header + LOG_POS_OFFSET, val);
- if (do_checksum)
- {
- ulong len= uint4korr(&header[EVENT_LEN_OFFSET]);
- /* fix len */
- int4store(&header[EVENT_LEN_OFFSET], len + BINLOG_CHECKSUM_LEN);
- }
+ /* fix len */
+ len+= writer.checksum_len;
+ int4store(header + EVENT_LEN_OFFSET, len);
- /* write the first half of the split header */
- if (my_b_write(&log_file, header, carry))
+ if (writer.write(header, LOG_EVENT_HEADER_LEN))
DBUG_RETURN(ER_ERROR_ON_WRITE);
- status_var_add(thd->status_var.binlog_bytes_written, carry);
- /*
- copy fixed second half of header to cache so the correct
- version will be written later.
- */
- memcpy((char *)cache->read_pos, &header[carry],
- LOG_EVENT_HEADER_LEN - carry);
+ cache->read_pos+= tail;
+ length-= tail;
+ carry= 0;
/* next event header at ... */
- hdr_offs= uint4korr(&header[EVENT_LEN_OFFSET]) - carry -
- (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
-
- if (do_checksum)
- {
- DBUG_ASSERT(crc == crc_0 && remains == 0);
- crc= my_checksum(crc, header, carry);
- remains= uint4korr(header + EVENT_LEN_OFFSET) - carry -
- BINLOG_CHECKSUM_LEN;
- }
- carry= 0;
+ hdr_offs= len - LOG_EVENT_HEADER_LEN - writer.checksum_len;
}
/* if there is anything to write, process it. */
if (likely(length > 0))
{
+ DBUG_EXECUTE_IF("fail_binlog_write_1",
+ errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE););
/*
process all event-headers in this (partial) cache.
if next header is beyond current read-buffer,
@@ -6673,52 +6798,28 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
very next iteration, just "eventually").
*/
- /* crc-calc the whole buffer */
- if (do_checksum && hdr_offs >= length)
+ if (hdr_offs >= length)
{
-
- DBUG_ASSERT(remains != 0 && crc != crc_0);
-
- crc= my_checksum(crc, cache->read_pos, length);
- remains -= length;
- if (my_b_write(&log_file, cache->read_pos, length))
+ if (writer.write(cache->read_pos, length))
DBUG_RETURN(ER_ERROR_ON_WRITE);
- if (remains == 0)
- {
- int4store(buf, crc);
- if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
- DBUG_RETURN(ER_ERROR_ON_WRITE);
- crc= crc_0;
- }
}
while (hdr_offs < length)
{
/*
- partial header only? save what we can get, process once
- we get the rest.
+ finish off with remains of the last event that crawls
+ from previous into the current buffer
*/
-
- if (do_checksum)
+ if (writer.remains != 0)
{
- if (remains != 0)
- {
- /*
- finish off with remains of the last event that crawls
- from previous into the current buffer
- */
- DBUG_ASSERT(crc != crc_0);
- crc= my_checksum(crc, cache->read_pos, hdr_offs);
- int4store(buf, crc);
- remains -= hdr_offs;
- DBUG_ASSERT(remains == 0);
- if (my_b_write(&log_file, cache->read_pos, hdr_offs) ||
- my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
- DBUG_RETURN(ER_ERROR_ON_WRITE);
- crc= crc_0;
- }
+ if (writer.write(cache->read_pos, hdr_offs))
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
}
+ /*
+ partial header only? save what we can get, process once
+ we get the rest.
+ */
if (hdr_offs + LOG_EVENT_HEADER_LEN > length)
{
carry= length - hdr_offs;
@@ -6729,37 +6830,25 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
{
/* we've got a full event-header, and it came in one piece */
uchar *ev= (uchar *)cache->read_pos + hdr_offs;
- uint event_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len
+ uint ev_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len
uchar *log_pos= ev + LOG_POS_OFFSET;
+ end_log_pos_inc += writer.checksum_len;
/* fix end_log_pos */
- val= uint4korr(log_pos) + group +
- (end_log_pos_inc += (do_checksum ? BINLOG_CHECKSUM_LEN : 0));
+ val= uint4korr(log_pos) + group + end_log_pos_inc;
int4store(log_pos, val);
- /* fix CRC */
- if (do_checksum)
- {
- /* fix length */
- int4store(ev + EVENT_LEN_OFFSET, event_len + BINLOG_CHECKSUM_LEN);
- remains= fix_log_event_crc(cache->read_pos, hdr_offs, event_len,
- length, &crc);
- if (my_b_write(&log_file, ev,
- remains == 0 ? event_len : length - hdr_offs))
- DBUG_RETURN(ER_ERROR_ON_WRITE);
- if (remains == 0)
- {
- int4store(buf, crc);
- if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
- DBUG_RETURN(ER_ERROR_ON_WRITE);
- crc= crc_0; // crc is complete
- }
- }
+ /* fix length */
+ int4store(ev + EVENT_LEN_OFFSET, ev_len + writer.checksum_len);
+
+ writer.remains= ev_len;
+ if (writer.write(ev, std::min<uint>(ev_len, length - hdr_offs)))
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
/* next event header at ... */
- hdr_offs += event_len; // incr by the netto len
+ hdr_offs += ev_len; // incr by the netto len
- DBUG_ASSERT(!do_checksum || remains == 0 || hdr_offs >= length);
+ DBUG_ASSERT(!writer.checksum_len || writer.remains == 0 || hdr_offs >= length);
}
}
@@ -6773,21 +6862,10 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
*/
hdr_offs -= length;
}
-
- /* Write data to the binary log file */
- DBUG_EXECUTE_IF("fail_binlog_write_1",
- errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE););
- if (!do_checksum)
- if (my_b_write(&log_file, cache->read_pos, length))
- DBUG_RETURN(ER_ERROR_ON_WRITE);
- status_var_add(thd->status_var.binlog_bytes_written, length);
-
- cache->read_pos=cache->read_end; // Mark buffer used up
} while ((length= my_b_fill(cache)));
DBUG_ASSERT(carry == 0);
- DBUG_ASSERT(!do_checksum || remains == 0);
- DBUG_ASSERT(!do_checksum || crc == crc_0);
+ DBUG_ASSERT(!writer.checksum_len || writer.remains == 0);
DBUG_RETURN(0); // All OK
}
@@ -6832,7 +6910,7 @@ bool MYSQL_BIN_LOG::write_incident_already_locked(THD *thd)
if (likely(is_open()))
{
- error= ev.write(&log_file);
+ error= write_event(&ev);
status_var_add(thd->status_var.binlog_bytes_written, ev.data_written);
}
@@ -6861,6 +6939,9 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd)
}
offset= my_b_tell(&log_file);
+
+ update_binlog_end_pos(offset);
+
/*
Take mutex to protect against a reader seeing partial writes of 64-bit
offset on 32-bit CPUs.
@@ -6882,17 +6963,16 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd)
}
void
-MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name,
- uint len)
+MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name_arg, uint len)
{
my_off_t offset;
- Binlog_checkpoint_log_event ev(name, len);
+ Binlog_checkpoint_log_event ev(name_arg, len);
/*
Note that we must sync the binlog checkpoint to disk.
Otherwise a subsequent log purge could delete binlogs that XA recovery
thinks are needed (even though they are not really).
*/
- if (!ev.write(&log_file) && !flush_and_sync(0))
+ if (!write_event(&ev) && !flush_and_sync(0))
{
signal_update();
}
@@ -6910,6 +6990,9 @@ MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name,
}
offset= my_b_tell(&log_file);
+
+ update_binlog_end_pos(offset);
+
/*
Take mutex to protect against a reader seeing partial writes of 64-bit
offset on 32-bit CPUs.
@@ -6955,13 +7038,14 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
Ha_trx_info *ha_info;
DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog");
-#ifdef WITH_WSREP
/*
Control should not be allowed beyond this point in wsrep_emulate_bin_log
- mode.
+ mode. Also, do not write the cached updates to binlog if binary logging is
+ disabled (log-bin/sql_log_bin).
*/
- if (wsrep_emulate_bin_log) DBUG_RETURN(0);
-#endif /* WITH_WSREP */
+ if (wsrep_emulate_bin_log || !(thd->variables.option_bits & OPTION_BIN_LOG))
+ DBUG_RETURN(0);
+
entry.thd= thd;
entry.cache_mngr= cache_mngr;
entry.error= 0;
@@ -7119,7 +7203,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
wfc->wakeup_error= orig_entry->thd->killed_errno();
if (!wfc->wakeup_error)
wfc->wakeup_error= ER_QUERY_INTERRUPTED;
- my_message(wfc->wakeup_error, ER(wfc->wakeup_error), MYF(0));
+ my_message(wfc->wakeup_error,
+ ER_THD(orig_entry->thd, wfc->wakeup_error), MYF(0));
DBUG_RETURN(-1);
}
}
@@ -7432,10 +7517,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
group_commit_entry *current, *last_in_queue;
group_commit_entry *queue= NULL;
bool check_purge= false;
- ulong binlog_id;
+ ulong UNINIT_VAR(binlog_id);
uint64 commit_id;
DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader");
- LINT_INIT(binlog_id);
{
DBUG_EXECUTE_IF("inject_binlog_commit_before_get_LOCK_log",
@@ -7487,11 +7571,12 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
commit_id= (last_in_queue == leader ? 0 : (uint64)leader->thd->query_id);
DBUG_EXECUTE_IF("binlog_force_commit_id",
{
- const LEX_STRING name= { C_STRING_WITH_LEN("commit_id") };
+ const LEX_STRING commit_name= { C_STRING_WITH_LEN("commit_id") };
bool null_value;
user_var_entry *entry=
(user_var_entry*) my_hash_search(&leader->thd->user_vars,
- (uchar*) name.str, name.length);
+ (uchar*) commit_name.str,
+ commit_name.length);
commit_id= entry->val_int(&null_value);
});
/*
@@ -7558,12 +7643,21 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
{
bool any_error= false;
bool all_error= true;
+
+ mysql_mutex_assert_not_owner(&LOCK_prepare_ordered);
+ mysql_mutex_assert_owner(&LOCK_log);
+ mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
+ mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
+ bool first= true, last;
for (current= queue; current != NULL; current= current->next)
{
+ last= current->next == NULL;
if (!current->error &&
RUN_HOOK(binlog_storage, after_flush,
- (current->thd, log_file_name,
- current->cache_mngr->last_commit_pos_offset, synced)))
+ (current->thd,
+ current->cache_mngr->last_commit_pos_file,
+ current->cache_mngr->last_commit_pos_offset, synced,
+ first, last)))
{
current->error= ER_ERROR_ON_WRITE;
current->commit_errno= -1;
@@ -7572,8 +7666,17 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
}
else
all_error= false;
+ first= false;
}
+ /* update binlog_end_pos so it can be read by dump thread
+ *
+ * note: must be _after_ the RUN_HOOK(after_flush) or else
+ * semi-sync-plugin might not have put the transaction into
+ * it's list before dump-thread tries to send it
+ */
+ update_binlog_end_pos(commit_offset);
+
if (any_error)
sql_print_error("Failed to run 'after_flush' hooks");
if (!all_error)
@@ -7614,18 +7717,54 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
commit_offset= my_b_write_tell(&log_file);
}
- DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered");
- mysql_mutex_lock(&LOCK_commit_ordered);
- last_commit_pos_offset= commit_offset;
+ DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_after_binlog_sync");
+ mysql_mutex_lock(&LOCK_after_binlog_sync);
/*
- We cannot unlock LOCK_log until we have locked LOCK_commit_ordered;
+ We cannot unlock LOCK_log until we have locked LOCK_after_binlog_sync;
otherwise scheduling could allow the next group commit to run ahead of us,
messing up the order of commit_ordered() calls. But as soon as
- LOCK_commit_ordered is obtained, we can let the next group commit start.
+ LOCK_after_binlog_sync is obtained, we can let the next group commit start.
*/
mysql_mutex_unlock(&LOCK_log);
DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log");
+
+ /*
+ Loop through threads and run the binlog_sync hook
+ */
+ {
+ mysql_mutex_assert_not_owner(&LOCK_prepare_ordered);
+ mysql_mutex_assert_not_owner(&LOCK_log);
+ mysql_mutex_assert_owner(&LOCK_after_binlog_sync);
+ mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
+
+ bool first= true, last;
+ for (current= queue; current != NULL; current= current->next)
+ {
+ last= current->next == NULL;
+ if (!current->error &&
+ RUN_HOOK(binlog_storage, after_sync,
+ (current->thd, current->cache_mngr->last_commit_pos_file,
+ current->cache_mngr->last_commit_pos_offset,
+ first, last)))
+ {
+ /* error is already printed inside hook */
+ }
+ first= false;
+ }
+ }
+
+ DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered");
+ mysql_mutex_lock(&LOCK_commit_ordered);
+ last_commit_pos_offset= commit_offset;
+
+ /*
+ Unlock LOCK_after_binlog_sync only *after* LOCK_commit_ordered has been
+ acquired so that groups can not reorder for the different stages of
+ the group commit procedure.
+ */
+ mysql_mutex_unlock(&LOCK_after_binlog_sync);
+ DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_after_binlog_sync");
++num_group_commits;
if (!opt_optimize_thread_scheduling)
@@ -7741,7 +7880,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
DBUG_RETURN(ER_ERROR_ON_WRITE);
});
- if (entry->end_event->write(&log_file))
+ if (write_event(entry->end_event))
{
entry->error_cache= NULL;
DBUG_RETURN(ER_ERROR_ON_WRITE);
@@ -7751,7 +7890,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
if (entry->incident_event)
{
- if (entry->incident_event->write(&log_file))
+ if (write_event(entry->incident_event))
{
entry->error_cache= NULL;
DBUG_RETURN(ER_ERROR_ON_WRITE);
@@ -7920,6 +8059,7 @@ void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd)
PSI_stage_info old_stage;
DBUG_ENTER("wait_for_update_relay_log");
+ mysql_mutex_assert_owner(&LOCK_log);
thd->ENTER_COND(&update_cond, &LOCK_log,
&stage_slave_has_read_all_relay_log,
&old_stage);
@@ -7951,6 +8091,7 @@ int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd,
DBUG_ENTER("wait_for_update_bin_log");
thd_wait_begin(thd, THD_WAIT_BINLOG);
+ mysql_mutex_assert_owner(&LOCK_log);
if (!timeout)
mysql_cond_wait(&update_cond, &LOCK_log);
else
@@ -7960,6 +8101,23 @@ int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd,
DBUG_RETURN(ret);
}
+int MYSQL_BIN_LOG::wait_for_update_binlog_end_pos(THD* thd,
+ struct timespec *timeout)
+{
+ int ret= 0;
+ DBUG_ENTER("wait_for_update_binlog_end_pos");
+
+ thd_wait_begin(thd, THD_WAIT_BINLOG);
+ mysql_mutex_assert_owner(get_binlog_end_pos_lock());
+ if (!timeout)
+ mysql_cond_wait(&update_cond, get_binlog_end_pos_lock());
+ else
+ ret= mysql_cond_timedwait(&update_cond, get_binlog_end_pos_lock(),
+ timeout);
+ thd_wait_end(thd);
+ DBUG_RETURN(ret);
+}
+
/**
Close the log file.
@@ -7993,11 +8151,11 @@ void MYSQL_BIN_LOG::close(uint exiting)
{
Stop_log_event s;
// the checksumming rule for relay-log case is similar to Rotate
- s.checksum_alg= is_relay_log ?
- (uint8) relay_log_checksum_alg : (uint8) binlog_checksum_options;
+ s.checksum_alg= is_relay_log ? relay_log_checksum_alg
+ : (enum_binlog_checksum_alg)binlog_checksum_options;
DBUG_ASSERT(!is_relay_log ||
relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
- s.write(&log_file);
+ write_event(&s);
bytes_written+= s.data_written;
signal_update();
@@ -8055,7 +8213,8 @@ void MYSQL_BIN_LOG::close(uint exiting)
if (mysql_file_close(index_file.file, MYF(0)) < 0 && ! write_error)
{
write_error= 1;
- sql_print_error(ER(ER_ERROR_ON_WRITE), index_file_name, errno);
+ sql_print_error(ER_THD_OR_DEFAULT(current_thd, ER_ERROR_ON_WRITE),
+ index_file_name, errno);
}
}
log_state= (exiting & LOG_CLOSE_TO_BE_OPENED) ? LOG_TO_BE_OPENED : LOG_CLOSED;
@@ -8282,7 +8441,8 @@ static void print_buffer_to_file(enum loglevel level, const char *buffer,
Add tag for slaves so that the user can see from which connection
the error originates.
*/
- tag_length= my_snprintf(tag, sizeof(tag), ER(ER_MASTER_LOG_PREFIX),
+ tag_length= my_snprintf(tag, sizeof(tag),
+ ER_THD(thd, ER_MASTER_LOG_PREFIX),
(int) thd->connection_name.length,
thd->connection_name.str);
}
@@ -8294,13 +8454,14 @@ static void print_buffer_to_file(enum loglevel level, const char *buffer,
localtime_r(&skr, &tm_tmp);
start=&tm_tmp;
- fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d [%s] %.*s%.*s\n",
- start->tm_year % 100,
+ fprintf(stderr, "%d-%02d-%02d %2d:%02d:%02d %lu [%s] %.*s%.*s\n",
+ start->tm_year + 1900,
start->tm_mon+1,
start->tm_mday,
start->tm_hour,
start->tm_min,
start->tm_sec,
+ (unsigned long) pthread_self(),
(level == ERROR_LEVEL ? "ERROR" : level == WARNING_LEVEL ?
"Warning" : "Note"),
tag_length, tag,
@@ -8377,6 +8538,9 @@ void sql_print_information(const char *format, ...)
va_list args;
DBUG_ENTER("sql_print_information");
+ if (disable_log_notes)
+ DBUG_VOID_RETURN; // Skip notes during start/shutdown
+
va_start(args, format);
error_log_print(INFORMATION_LEVEL, format, args);
va_end(args);
@@ -8426,8 +8590,7 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
{
int cookie;
struct commit_entry entry;
- bool is_group_commit_leader;
- LINT_INIT(is_group_commit_leader);
+ bool UNINIT_VAR(is_group_commit_leader);
if (need_prepare_ordered)
{
@@ -9075,12 +9238,10 @@ int TC_LOG_MMAP::recover()
the first byte after magic signature is set to current
number of storage engines on startup
*/
- if (data[sizeof(tc_log_magic)] != total_ha_2pc)
+ if (data[sizeof(tc_log_magic)] > total_ha_2pc)
{
sql_print_error("Recovery failed! You must enable "
- "exactly %d storage engines that support "
- "two-phase commit protocol",
- data[sizeof(tc_log_magic)]);
+ "all engines that were enabled at the moment of the crash");
goto err1;
}
@@ -9163,7 +9324,7 @@ int TC_LOG_BINLOG::open(const char *opt_name)
{
mysql_mutex_lock(&LOCK_log);
/* generate a new binlog to mask a corrupted one */
- open(opt_name, LOG_BIN, 0, WRITE_CACHE, max_binlog_size, 0, TRUE);
+ open(opt_name, LOG_BIN, 0, 0, WRITE_CACHE, max_binlog_size, 0, TRUE);
mysql_mutex_unlock(&LOCK_log);
cleanup();
return 1;
@@ -9193,14 +9354,10 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all,
binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data();
if (!cache_mngr)
-#ifdef WITH_WSREP
{
WSREP_DEBUG("Skipping empty log_xid: %s", thd->query());
DBUG_RETURN(0);
}
-#else
- DBUG_RETURN(0);
-#endif /* WITH_WSREP */
cache_mngr->using_xa= TRUE;
cache_mngr->xa_xid= xid;
@@ -9355,6 +9512,8 @@ TC_LOG_BINLOG::mark_xid_done(ulong binlog_id, bool write_checkpoint)
DBUG_ASSERT(b);
if (b->binlog_id == current || b->xid_count > 0)
break;
+ WSREP_XID_LIST_ENTRY("TC_LOG_BINLOG::mark_xid_done(): Removing "
+ "xid_list_entry for %s (%lu)", b);
my_free(binlog_xid_count_list.get());
}
@@ -9506,9 +9665,7 @@ binlog_background_thread(void *arg __attribute__((unused)))
THD_STAGE_INFO(thd, stage_binlog_stopping_background_thread);
- mysql_mutex_lock(&LOCK_thread_count);
delete thd;
- mysql_mutex_unlock(&LOCK_thread_count);
my_thread_end();
@@ -9668,6 +9825,13 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
break;
#endif
+ case START_ENCRYPTION_EVENT:
+ {
+ if (fdle->start_decryption((Start_encryption_log_event*) ev))
+ goto err2;
+ }
+ break;
+
default:
/* Nothing. */
break;
@@ -9744,6 +9908,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
sql_print_error("Error reading binlog files during recovery. Aborting.");
goto err2;
}
+ fdle->reset_crypto();
}
if (do_xa)
@@ -9935,15 +10100,14 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var,
{
ulong value= *((ulong *)save);
bool check_purge= false;
- ulong prev_binlog_id;
- LINT_INIT(prev_binlog_id);
+ ulong UNINIT_VAR(prev_binlog_id);
mysql_mutex_lock(mysql_bin_log.get_log_lock());
if(mysql_bin_log.is_open())
{
prev_binlog_id= mysql_bin_log.current_binlog_id;
if (binlog_checksum_options != value)
- mysql_bin_log.checksum_alg_reset= (uint8) value;
+ mysql_bin_log.checksum_alg_reset= (enum_binlog_checksum_alg)value;
if (mysql_bin_log.rotate(true, &check_purge))
check_purge= false;
}
@@ -9990,8 +10154,7 @@ static MYSQL_SYSVAR_ENUM(
binlog_checksum_options,
PLUGIN_VAR_RQCMDARG,
"Type of BINLOG_CHECKSUM_ALG. Include checksum for "
- "log events in the binary log. Possible values are NONE and CRC32; "
- "default is NONE.",
+ "log events in the binary log",
NULL,
binlog_checksum_update,
BINLOG_CHECKSUM_ALG_OFF,
@@ -10076,3 +10239,58 @@ maria_declare_plugin(binlog)
MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
}
maria_declare_plugin_end;
+
+#ifdef WITH_WSREP
+IO_CACHE * get_trans_log(THD * thd)
+{
+ DBUG_ASSERT(binlog_hton->slot != HA_SLOT_UNDEF);
+ binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*)
+ thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr)
+ return cache_mngr->get_binlog_cache_log(true);
+
+ WSREP_DEBUG("binlog cache not initialized, conn :%ld", thd->thread_id);
+ return NULL;
+}
+
+
+bool wsrep_trans_cache_is_empty(THD *thd)
+{
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ return (!cache_mngr || cache_mngr->trx_cache.empty());
+}
+
+
+void thd_binlog_trx_reset(THD * thd)
+{
+ /*
+ todo: fix autocommit select to not call the caller
+ */
+ if (thd_get_ha_data(thd, binlog_hton) != NULL)
+ {
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr)
+ {
+ cache_mngr->reset(false, true);
+ if (!cache_mngr->stmt_cache.empty())
+ {
+ WSREP_DEBUG("pending events in stmt cache, sql: %s", thd->query());
+ cache_mngr->stmt_cache.reset();
+ }
+ }
+ }
+ thd->clear_binlog_table_maps();
+}
+
+
+void thd_binlog_rollback_stmt(THD * thd)
+{
+ WSREP_DEBUG("thd_binlog_rollback_stmt :%ld", thd->thread_id);
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr)
+ cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
+}
+#endif /* WITH_WSREP */