diff options
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 1310 |
1 files changed, 764 insertions, 546 deletions
diff --git a/sql/log.cc b/sql/log.cc index b63d72f0d4a..a9f486d88c1 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -52,12 +52,10 @@ #include "sql_plugin.h" #include "rpl_handler.h" -#ifdef WITH_WSREP -#include "wsrep_mysqld.h" -#endif /* WITH_WSREP */ #include "debug_sync.h" #include "sql_show.h" #include "my_pthread.h" +#include "wsrep_mysqld.h" /* max size of the log message */ #define MAX_LOG_BUFFER_SIZE 1024 @@ -66,8 +64,12 @@ #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") +handlerton *binlog_hton; LOGGER logger; +const char *log_bin_index= 0; +const char *log_bin_basename= 0; + MYSQL_BIN_LOG mysql_bin_log(&sync_binlog_period); static bool test_if_number(const char *str, @@ -94,6 +96,7 @@ ulong opt_binlog_dbug_fsync_sleep= 0; mysql_mutex_t LOCK_prepare_ordered; mysql_cond_t COND_prepare_ordered; +mysql_mutex_t LOCK_after_binlog_sync; mysql_mutex_t LOCK_commit_ordered; static ulonglong binlog_status_var_num_commits; @@ -263,9 +266,9 @@ public: return m_pending; } - void set_pending(Rows_log_event *const pending) + void set_pending(Rows_log_event *const pending_arg) { - m_pending= pending; + m_pending= pending_arg; } void set_incident(void) @@ -299,11 +302,12 @@ public: incident= FALSE; before_stmt_pos= MY_OFF_T_UNDEF; /* - The truncate function calls reinit_io_cache that calls my_b_flush_io_cache - which may increase disk_writes. This breaks the disk_writes use by the - binary log which aims to compute the ratio between in-memory cache usage - and disk cache usage. To avoid this undesirable behavior, we reset the - variable after truncating the cache. + The truncate function calls reinit_io_cache that calls + my_b_flush_io_cache which may increase disk_writes. This breaks + the disk_writes use by the binary log which aims to compute the + ratio between in-memory cache usage and disk cache usage. To + avoid this undesirable behavior, we reset the variable after + truncating the cache. */ cache_log.disk_writes= 0; DBUG_ASSERT(empty()); @@ -526,16 +530,11 @@ private: binlog_cache_mngr(const binlog_cache_mngr& info); }; -handlerton *binlog_hton; -#ifdef WITH_WSREP -extern handlerton *wsrep_hton; -#endif - bool LOGGER::is_log_table_enabled(uint log_table_type) { switch (log_table_type) { case QUERY_LOG_SLOW: - return (table_log_handler != NULL) && opt_slow_log; + return (table_log_handler != NULL) && global_system_variables.sql_log_slow; case QUERY_LOG_GENERAL: return (table_log_handler != NULL) && opt_log ; default: @@ -544,67 +543,6 @@ bool LOGGER::is_log_table_enabled(uint log_table_type) } } -#ifdef WITH_WSREP -IO_CACHE * get_trans_log(THD * thd) -{ - binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*) - thd_get_ha_data(thd, binlog_hton); - if (cache_mngr) - { - return cache_mngr->get_binlog_cache_log(true); - } - else - { - WSREP_DEBUG("binlog cache not initialized, conn :%ld", thd->thread_id); - return NULL; - } -} - - -bool wsrep_trans_cache_is_empty(THD *thd) -{ - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - return (!cache_mngr || cache_mngr->trx_cache.empty()); -} - -void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end) -{ - thd->binlog_flush_pending_rows_event(stmt_end); -} -void thd_binlog_trx_reset(THD * thd) -{ - /* - todo: fix autocommit select to not call the caller - */ - if (thd_get_ha_data(thd, binlog_hton) != NULL) - { - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - if (cache_mngr) - { - cache_mngr->reset(false, true); - if (!cache_mngr->stmt_cache.empty()) - { - WSREP_DEBUG("pending events in stmt cache, sql: %s", thd->query()); - cache_mngr->stmt_cache.reset(); - } - } - } - thd->clear_binlog_table_maps(); -} - -void thd_binlog_rollback_stmt(THD * thd) -{ - WSREP_DEBUG("thd_binlog_rollback_stmt :%ld", thd->thread_id); - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - if (cache_mngr) cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); -} - -#endif - - /** Check if a given table is opened log table @@ -702,7 +640,7 @@ void Log_to_csv_event_handler::cleanup() bool Log_to_csv_event_handler:: log_general(THD *thd, my_hrtime_t event_time, const char *user_host, - uint user_host_len, int thread_id, + uint user_host_len, int thread_id_arg, const char *command_type, uint command_type_len, const char *sql_text, uint sql_text_len, CHARSET_INFO *client_cs) @@ -783,7 +721,7 @@ bool Log_to_csv_event_handler:: /* do a write */ if (table->field[1]->store(user_host, user_host_len, client_cs) || - table->field[2]->store((longlong) thread_id, TRUE) || + table->field[2]->store((longlong) thread_id_arg, TRUE) || table->field[3]->store((longlong) global_system_variables.server_id, TRUE) || table->field[4]->store(command_type, command_type_len, client_cs)) @@ -916,7 +854,7 @@ bool Log_to_csv_event_handler:: restore_record(table, s->default_values); // Get empty record /* check that all columns exist */ - if (table->s->fields < 11) + if (table->s->fields < 13) goto err; /* store the time and user values */ @@ -997,6 +935,12 @@ bool Log_to_csv_event_handler:: if (table->field[11]->store((longlong) thd->thread_id, TRUE)) goto err; + /* Rows_affected */ + if (table->field[12]->store(thd->get_stmt_da()->is_ok() ? + (longlong) thd->get_stmt_da()->affected_rows() : + 0, TRUE)) + goto err; + /* log table entries are not replicated */ if (table->file->ha_write_row(table->record[0])) goto err; @@ -1106,7 +1050,7 @@ bool Log_to_file_event_handler:: bool Log_to_file_event_handler:: log_general(THD *thd, my_hrtime_t event_time, const char *user_host, - uint user_host_len, int thread_id, + uint user_host_len, int thread_id_arg, const char *command_type, uint command_type_len, const char *sql_text, uint sql_text_len, CHARSET_INFO *client_cs) @@ -1115,7 +1059,7 @@ bool Log_to_file_event_handler:: thd->push_internal_handler(&error_handler); bool retval= mysql_log.write(hrtime_to_time(event_time), user_host, user_host_len, - thread_id, command_type, command_type_len, + thread_id_arg, command_type, command_type_len, sql_text, sql_text_len); thd->pop_internal_handler(); return retval; @@ -1126,7 +1070,7 @@ bool Log_to_file_event_handler::init() { if (!is_initialized) { - if (opt_slow_log) + if (global_system_variables.sql_log_slow) mysql_slow_log.open_slow_log(opt_slow_logname); if (opt_log) @@ -1150,7 +1094,7 @@ void Log_to_file_event_handler::flush() /* reopen log files */ if (opt_log) mysql_log.reopen_file(); - if (opt_slow_log) + if (global_system_variables.sql_log_slow) mysql_slow_log.reopen_file(); } @@ -1278,7 +1222,7 @@ bool LOGGER::flush_slow_log() logger.lock_exclusive(); /* Reopen slow log file */ - if (opt_slow_log) + if (global_system_variables.sql_log_slow) file_log_handler->get_mysql_slow_log()->reopen_file(); /* End of log flush */ @@ -1348,11 +1292,11 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length, if (*slow_log_handler_list) { /* do not log slow queries from replication threads */ - if (thd->slave_thread && !opt_log_slow_slave_statements) + if (!thd->variables.sql_log_slow) return 0; lock_shared(); - if (!opt_slow_log) + if (!global_system_variables.sql_log_slow) { unlock(); return 0; @@ -1526,7 +1470,7 @@ bool LOGGER::activate_log_handler(THD* thd, uint log_type) lock_exclusive(); switch (log_type) { case QUERY_LOG_SLOW: - if (!opt_slow_log) + if (!global_system_variables.sql_log_slow) { file_log= file_log_handler->get_mysql_slow_log(); @@ -1540,7 +1484,7 @@ bool LOGGER::activate_log_handler(THD* thd, uint log_type) else { init_slow_log(log_output_options); - opt_slow_log= TRUE; + global_system_variables.sql_log_slow= TRUE; } } break; @@ -1574,12 +1518,11 @@ bool LOGGER::activate_log_handler(THD* thd, uint log_type) void LOGGER::deactivate_log_handler(THD *thd, uint log_type) { my_bool *tmp_opt= 0; - MYSQL_LOG *file_log; - LINT_INIT(file_log); + MYSQL_LOG *UNINIT_VAR(file_log); switch (log_type) { case QUERY_LOG_SLOW: - tmp_opt= &opt_slow_log; + tmp_opt= &global_system_variables.sql_log_slow; file_log= file_log_handler->get_mysql_slow_log(); break; case QUERY_LOG_GENERAL: @@ -1655,11 +1598,7 @@ binlog_trans_log_savepos(THD *thd, my_off_t *pos) DBUG_ENTER("binlog_trans_log_savepos"); DBUG_ASSERT(pos != NULL); binlog_cache_mngr *const cache_mngr= thd->binlog_setup_trx_data(); -#ifdef WITH_WSREP DBUG_ASSERT((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()); -#else - DBUG_ASSERT(mysql_bin_log.is_open()); -#endif *pos= cache_mngr->trx_cache.get_byte_position(); DBUG_PRINT("return", ("*pos: %lu", (ulong) *pos)); DBUG_VOID_RETURN; @@ -1707,16 +1646,8 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos) int binlog_init(void *p) { binlog_hton= (handlerton *)p; -#ifdef WITH_WSREP - if (WSREP_ON) - binlog_hton->state= SHOW_OPTION_YES; - else - { -#endif /* WITH_WSREP */ - binlog_hton->state=opt_bin_log ? SHOW_OPTION_YES : SHOW_OPTION_NO; -#ifdef WITH_WSREP - } -#endif /* WITH_WSREP */ + binlog_hton->state= (WSREP_ON || opt_bin_log) ? SHOW_OPTION_YES + : SHOW_OPTION_NO; binlog_hton->db_type=DB_TYPE_BINLOG; binlog_hton->savepoint_offset= sizeof(my_off_t); binlog_hton->close_connection= binlog_close_connection; @@ -1860,7 +1791,9 @@ binlog_commit_flush_stmt_cache(THD *thd, bool all, #ifdef WITH_WSREP if (thd->wsrep_mysql_replicated > 0) { - WSREP_DEBUG("avoiding binlog_commit_flush_trx_cache: %d", thd->wsrep_mysql_replicated); + DBUG_ASSERT(WSREP_ON); + WSREP_DEBUG("avoiding binlog_commit_flush_trx_cache: %d", + thd->wsrep_mysql_replicated); return 0; } #endif @@ -2020,12 +1953,12 @@ static bool trans_cannot_safely_rollback(THD *thd, bool all) return ((thd->variables.option_bits & OPTION_KEEP_LOG) || (trans_has_updated_non_trans_table(thd) && - WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT) || + thd->wsrep_binlog_format() == BINLOG_FORMAT_STMT) || (cache_mngr->trx_cache.changes_to_non_trans_temp_table() && - WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED) || + thd->wsrep_binlog_format() == BINLOG_FORMAT_MIXED) || (trans_has_updated_non_trans_table(thd) && ending_single_stmt_trans(thd,all) && - WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED)); + thd->wsrep_binlog_format() == BINLOG_FORMAT_MIXED)); } @@ -2047,9 +1980,12 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) DBUG_ENTER("binlog_commit"); binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); -#ifdef WITH_WSREP - if (!cache_mngr) DBUG_RETURN(0); -#endif /* WITH_WSREP */ + + if (!cache_mngr) + { + DBUG_ASSERT(WSREP(thd)); + DBUG_RETURN(0); + } DBUG_PRINT("debug", ("all: %d, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", @@ -2106,9 +2042,12 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) int error= 0; binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); -#ifdef WITH_WSREP - if (!cache_mngr) DBUG_RETURN(0); -#endif /* WITH_WSREP */ + + if (!cache_mngr) + { + DBUG_ASSERT(WSREP(thd)); + DBUG_RETURN(0); + } DBUG_PRINT("debug", ("all: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", YESNO(all), @@ -2137,12 +2076,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) cache_mngr->reset(false, true); DBUG_RETURN(error); } -#ifdef WITH_WSREP - if (!wsrep_emulate_bin_log && - mysql_bin_log.check_write_error(thd)) -#else - if (mysql_bin_log.check_write_error(thd)) -#endif + if (!wsrep_emulate_bin_log && mysql_bin_log.check_write_error(thd)) { /* "all == true" means that a "rollback statement" triggered the error and @@ -2158,7 +2092,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) error |= binlog_truncate_trx_cache(thd, cache_mngr, all); } else if (!error) - { + { if (ending_trans(thd, all) && trans_cannot_safely_rollback(thd, all)) error= binlog_rollback_flush_trx_cache(thd, all, cache_mngr); /* @@ -2173,9 +2107,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) else if (ending_trans(thd, all) || (!(thd->variables.option_bits & OPTION_KEEP_LOG) && (!stmt_has_updated_non_trans_table(thd) || - WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_STMT) && + thd->wsrep_binlog_format() != BINLOG_FORMAT_STMT) && (!cache_mngr->trx_cache.changes_to_non_trans_temp_table() || - WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_MIXED))) + thd->wsrep_binlog_format() != BINLOG_FORMAT_MIXED))) error= binlog_truncate_trx_cache(thd, cache_mngr, all); } @@ -2216,11 +2150,11 @@ void MYSQL_BIN_LOG::set_write_error(THD *thd, bool is_transactional) { if (is_transactional) { - my_message(ER_TRANS_CACHE_FULL, ER(ER_TRANS_CACHE_FULL), MYF(MY_WME)); + my_message(ER_TRANS_CACHE_FULL, ER_THD(thd, ER_TRANS_CACHE_FULL), MYF(MY_WME)); } else { - my_message(ER_STMT_CACHE_FULL, ER(ER_STMT_CACHE_FULL), MYF(MY_WME)); + my_message(ER_STMT_CACHE_FULL, ER_THD(thd, ER_STMT_CACHE_FULL), MYF(MY_WME)); } } else @@ -2280,17 +2214,14 @@ bool MYSQL_BIN_LOG::check_write_error(THD *thd) static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv) { - DBUG_ENTER("binlog_savepoint_set"); int error= 1; + DBUG_ENTER("binlog_savepoint_set"); + + if (wsrep_emulate_bin_log) + DBUG_RETURN(0); + char buf[1024]; -#ifdef WITH_WSREP - if (wsrep_emulate_bin_log) DBUG_RETURN(0); - /* - Clear table maps before writing SAVEPOINT event. This enforces - recreation of table map events for the following row event. - */ - thd->clear_binlog_table_maps(); -#endif /* WITH_WSREP */ + String log_query(buf, sizeof(buf), &my_charset_bin); if (log_query.copy(STRING_WITH_LEN("SAVEPOINT "), &my_charset_bin) || append_identifier(thd, &log_query, @@ -2322,17 +2253,15 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) { DBUG_ENTER("binlog_savepoint_rollback"); + if (wsrep_emulate_bin_log) + DBUG_RETURN(0); + /* Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some non-transactional table. Otherwise, truncate the binlog cache starting from the SAVEPOINT command. */ -#ifdef WITH_WSREP - if (!wsrep_emulate_bin_log && - unlikely(trans_has_updated_non_trans_table(thd) || -#else if (unlikely(trans_has_updated_non_trans_table(thd) || -#endif (thd->variables.option_bits & OPTION_KEEP_LOG))) { char buf[1024]; @@ -2346,10 +2275,20 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) TRUE, FALSE, TRUE, errcode); DBUG_RETURN(mysql_bin_log.write(&qinfo)); } -#ifdef WITH_WSREP - if (!wsrep_emulate_bin_log) -#endif - binlog_trans_log_truncate(thd, *(my_off_t*)sv); + + binlog_trans_log_truncate(thd, *(my_off_t*)sv); + + /* + When a SAVEPOINT is executed inside a stored function/trigger we force the + pending event to be flushed with a STMT_END_F flag and clear the table maps + as well to ensure that following DMLs will have a clean state to start + with. ROLLBACK inside a stored routine has to finalize possibly existing + current row-based pending event with cleaning up table maps. That ensures + that following DMLs will have a clean state to start with. + */ + if (thd->in_sub_stmt) + thd->clear_binlog_table_maps(); + DBUG_RETURN(0); } @@ -2486,13 +2425,13 @@ static void setup_windows_event_source() nonzero if not possible to get unique filename. */ -static int find_uniq_filename(char *name) +static int find_uniq_filename(char *name, ulong next_log_number) { uint i; char buff[FN_REFLEN], ext_buf[FN_REFLEN]; struct st_my_dir *dir_info; reg1 struct fileinfo *file_info; - ulong max_found= 0, next= 0, number= 0; + ulong max_found, next, number; size_t buf_length, length; char *start, *end; int error= 0; @@ -2512,6 +2451,7 @@ static int find_uniq_filename(char *name) DBUG_RETURN(1); } file_info= dir_info->dir_entry; + max_found= next_log_number ? next_log_number-1 : 0; for (i= dir_info->number_of_files ; i-- ; file_info++) { if (strncmp(file_info->name, start, length) == 0 && @@ -2523,7 +2463,7 @@ static int find_uniq_filename(char *name) my_dirend(dir_info); /* check if reached the maximum possible extension number */ - if (max_found == MAX_LOG_UNIQUE_FN_EXT) + if (max_found >= MAX_LOG_UNIQUE_FN_EXT) { sql_print_error("Log filename extension number exhausted: %06lu. \ Please fix this by archiving old logs and \ @@ -2584,14 +2524,18 @@ void MYSQL_LOG::init(enum_log_type log_type_arg, bool MYSQL_LOG::init_and_set_log_file_name(const char *log_name, const char *new_name, + ulong next_log_number, enum_log_type log_type_arg, enum cache_type io_cache_type_arg) { init(log_type_arg, io_cache_type_arg); - if (new_name && !strmov(log_file_name, new_name)) - return TRUE; - else if (!new_name && generate_new_name(log_file_name, log_name)) + if (new_name) + { + strmov(log_file_name, new_name); + } + else if (!new_name && generate_new_name(log_file_name, log_name, + next_log_number)) return TRUE; return FALSE; @@ -2624,7 +2568,8 @@ bool MYSQL_LOG::open( PSI_file_key log_file_key, #endif const char *log_name, enum_log_type log_type_arg, - const char *new_name, enum cache_type io_cache_type_arg) + const char *new_name, ulong next_log_number, + enum cache_type io_cache_type_arg) { char buff[FN_REFLEN]; MY_STAT f_stat; @@ -2643,7 +2588,13 @@ bool MYSQL_LOG::open( goto err; } - if (init_and_set_log_file_name(name, new_name, + /* + log_type is LOG_UNKNOWN if we should not generate a new name + This is only used when called from MYSQL_BINARY_LOG::open, which + has already updated log_file_name. + */ + if (log_type_arg != LOG_UNKNOWN && + init_and_set_log_file_name(name, new_name, next_log_number, log_type_arg, io_cache_type_arg)) goto err; @@ -2798,7 +2749,8 @@ void MYSQL_LOG::cleanup() } -int MYSQL_LOG::generate_new_name(char *new_name, const char *log_name) +int MYSQL_LOG::generate_new_name(char *new_name, const char *log_name, + ulong next_log_number) { fn_format(new_name, log_name, mysql_data_home, "", 4); if (log_type == LOG_BIN) @@ -2806,10 +2758,12 @@ int MYSQL_LOG::generate_new_name(char *new_name, const char *log_name) if (!fn_ext(log_name)[0]) { if (DBUG_EVALUATE_IF("binlog_inject_new_name_error", TRUE, FALSE) || - find_uniq_filename(new_name)) + find_uniq_filename(new_name, next_log_number)) { - if (current_thd) - my_printf_error(ER_NO_UNIQUE_LOGFILE, ER(ER_NO_UNIQUE_LOGFILE), + THD *thd= current_thd; + if (thd) + my_printf_error(ER_NO_UNIQUE_LOGFILE, + ER_THD(thd, ER_NO_UNIQUE_LOGFILE), MYF(ME_FATALERROR), log_name); sql_print_error(ER_DEFAULT(ER_NO_UNIQUE_LOGFILE), log_name); return 1; @@ -2857,7 +2811,7 @@ void MYSQL_QUERY_LOG::reopen_file() #ifdef HAVE_PSI_INTERFACE m_log_file_key, #endif - save_name, log_type, 0, io_cache_type); + save_name, log_type, 0, 0, io_cache_type); my_free(save_name); mysql_mutex_unlock(&LOCK_log); @@ -2892,7 +2846,7 @@ void MYSQL_QUERY_LOG::reopen_file() */ bool MYSQL_QUERY_LOG::write(time_t event_time, const char *user_host, - uint user_host_len, int thread_id, + uint user_host_len, int thread_id_arg, const char *command_type, uint command_type_len, const char *sql_text, uint sql_text_len) { @@ -2931,7 +2885,7 @@ bool MYSQL_QUERY_LOG::write(time_t event_time, const char *user_host, goto err; /* command_type, thread_id */ - length= my_snprintf(buff, 32, "%5ld ", (long) thread_id); + length= my_snprintf(buff, 32, "%5ld ", (long) thread_id_arg); if (my_b_write(&log_file, (uchar*) buff, length)) goto err; @@ -3043,12 +2997,16 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time, sprintf(lock_time_buff, "%.6f", ulonglong2double(lock_utime)/1000000.0); if (my_b_printf(&log_file, "# Thread_id: %lu Schema: %s QC_hit: %s\n" \ - "# Query_time: %s Lock_time: %s Rows_sent: %lu Rows_examined: %lu\n", + "# Query_time: %s Lock_time: %s Rows_sent: %lu Rows_examined: %lu\n" \ + "# Rows_affected: %lu\n", (ulong) thd->thread_id, (thd->db ? thd->db : ""), ((thd->query_plan_flags & QPLAN_QC) ? "Yes" : "No"), query_time_buff, lock_time_buff, (ulong) thd->get_sent_row_count(), - (ulong) thd->get_examined_row_count()) == (size_t) -1) + (ulong) thd->get_examined_row_count(), + thd->get_stmt_da()->is_ok() ? + (ulong) thd->get_stmt_da()->affected_rows() : + 0) == (size_t) -1) tmp_errno= errno; if ((thd->variables.log_slow_verbosity & LOG_SLOW_VERBOSITY_QUERY_PLAN) && (thd->query_plan_flags & @@ -3076,7 +3034,7 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time, { StringBuffer<128> buf; DBUG_ASSERT(!thd->free_list); - if (!print_explain_query(thd->lex, thd, &buf)) + if (!print_explain_for_slow_log(thd->lex, thd, &buf)) my_b_printf(&log_file, "%s", buf.c_ptr_safe()); thd->free_items(); } @@ -3140,7 +3098,7 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time, if (! write_error) { write_error= 1; - sql_print_error(ER(ER_ERROR_ON_WRITE), name, tmp_errno); + sql_print_error(ER_THD(thd, ER_ERROR_ON_WRITE), name, tmp_errno); } } } @@ -3155,8 +3113,8 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time, first change fn_format() to cut the file name if it's too long. */ const char *MYSQL_LOG::generate_name(const char *log_name, - const char *suffix, - bool strip_ext, char *buff) + const char *suffix, + bool strip_ext, char *buff) { if (!log_name || !log_name[0]) { @@ -3176,6 +3134,22 @@ const char *MYSQL_LOG::generate_name(const char *log_name, } +/* + Print some additional information about addition/removal of + XID list entries. + TODO: Remove once MDEV-9510 is fixed. +*/ +#ifdef WITH_WSREP +#define WSREP_XID_LIST_ENTRY(X, Y) \ + if (wsrep_debug) \ + { \ + char buf[FN_REFLEN]; \ + strmake(buf, Y->binlog_name, Y->binlog_name_len); \ + WSREP_DEBUG(X, buf, Y->binlog_id); \ + } +#else +#define WSREP_XID_LIST_ENTRY(X, Y) do { } while(0) +#endif MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period) :reset_master_pending(0), mark_xid_done_waiting(0), @@ -3240,6 +3214,8 @@ void MYSQL_BIN_LOG::cleanup() */ DBUG_ASSERT(b->xid_count == 0); DBUG_ASSERT(!binlog_xid_count_list.head()); + WSREP_XID_LIST_ENTRY("MYSQL_BIN_LOG::cleanup(): Removing xid_list_entry " + "for %s (%lu)", b); my_free(b); } @@ -3247,6 +3223,7 @@ void MYSQL_BIN_LOG::cleanup() mysql_mutex_destroy(&LOCK_index); mysql_mutex_destroy(&LOCK_xid_list); mysql_mutex_destroy(&LOCK_binlog_background_thread); + mysql_mutex_destroy(&LOCK_binlog_end_pos); mysql_cond_destroy(&update_cond); mysql_cond_destroy(&COND_queue_busy); mysql_cond_destroy(&COND_xid_list); @@ -3292,6 +3269,9 @@ void MYSQL_BIN_LOG::init_pthread_objects() &COND_binlog_background_thread, 0); mysql_cond_init(key_BINLOG_COND_binlog_background_thread_end, &COND_binlog_background_thread_end, 0); + + mysql_mutex_init(m_key_LOCK_binlog_end_pos, &LOCK_binlog_end_pos, + MY_MUTEX_INIT_SLOW); } @@ -3378,6 +3358,7 @@ bool MYSQL_BIN_LOG::open_index_file(const char *index_file_name_arg, bool MYSQL_BIN_LOG::open(const char *log_name, enum_log_type log_type_arg, const char *new_name, + ulong next_log_number, enum cache_type io_cache_type_arg, ulong max_size_arg, bool null_created_arg, @@ -3404,8 +3385,9 @@ bool MYSQL_BIN_LOG::open(const char *log_name, DBUG_RETURN(1); } - if (init_and_set_log_file_name(log_name, new_name, log_type_arg, - io_cache_type_arg)) + /* We need to calculate new log file name for purge to delete old */ + if (init_and_set_log_file_name(log_name, new_name, next_log_number, + log_type_arg, io_cache_type_arg)) { sql_print_error("MYSQL_BIN_LOG::open failed to generate new file name."); DBUG_RETURN(1); @@ -3418,13 +3400,15 @@ bool MYSQL_BIN_LOG::open(const char *log_name, DBUG_EVALUATE_IF("fault_injection_registering_index", 1, 0)) { /** - TODO: although this was introduced to appease valgrind - when injecting emulated faults using fault_injection_registering_index - it may be good to consider what actually happens when - open_purge_index_file succeeds but register or sync fails. - - Perhaps we might need the code below in MYSQL_BIN_LOG::cleanup - for "real life" purposes as well? + TODO: + Although this was introduced to appease valgrind when + injecting emulated faults using + fault_injection_registering_index it may be good to consider + what actually happens when open_purge_index_file succeeds but + register or sync fails. + + Perhaps we might need the code below in MYSQL_LOG_BIN::cleanup + for "real life" purposes as well? */ DBUG_EXECUTE_IF("fault_injection_registering_index", { if (my_b_inited(&purge_index_file)) @@ -3447,7 +3431,9 @@ bool MYSQL_BIN_LOG::open(const char *log_name, #ifdef HAVE_PSI_INTERFACE m_key_file_log, #endif - log_name, log_type_arg, new_name, io_cache_type_arg)) + log_name, + LOG_UNKNOWN, /* Don't generate new name */ + 0, 0, io_cache_type_arg)) { #ifdef HAVE_REPLICATION close_purge_index_file(); @@ -3491,25 +3477,52 @@ bool MYSQL_BIN_LOG::open(const char *log_name, */ if (io_cache_type == WRITE_CACHE) s.flags |= LOG_EVENT_BINLOG_IN_USE_F; - s.checksum_alg= is_relay_log ? - /* relay-log */ - /* inherit master's A descriptor if one has been received */ - (relay_log_checksum_alg= - (relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ? - relay_log_checksum_alg : - /* otherwise use slave's local preference of RL events verification */ - (opt_slave_sql_verify_checksum == 0) ? - (uint8) BINLOG_CHECKSUM_ALG_OFF : (uint8) binlog_checksum_options): - /* binlog */ - (uint8) binlog_checksum_options; + + if (is_relay_log) + { + if (relay_log_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF) + relay_log_checksum_alg= + opt_slave_sql_verify_checksum ? (enum_binlog_checksum_alg) binlog_checksum_options + : BINLOG_CHECKSUM_ALG_OFF; + s.checksum_alg= relay_log_checksum_alg; + } + else + s.checksum_alg= (enum_binlog_checksum_alg)binlog_checksum_options; + + crypto.scheme = 0; DBUG_ASSERT(s.checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); if (!s.is_valid()) goto err; s.dont_set_created= null_created_arg; - if (s.write(&log_file)) + if (write_event(&s)) goto err; bytes_written+= s.data_written; + if (encrypt_binlog) + { + uint key_version= encryption_key_get_latest_version(ENCRYPTION_KEY_SYSTEM_DATA); + if (key_version == ENCRYPTION_KEY_VERSION_INVALID) + { + sql_print_error("Failed to enable encryption of binary logs"); + goto err; + } + + if (key_version != ENCRYPTION_KEY_NOT_ENCRYPTED) + { + if (my_random_bytes(crypto.nonce, sizeof(crypto.nonce))) + goto err; + + Start_encryption_log_event sele(1, key_version, crypto.nonce); + sele.checksum_alg= s.checksum_alg; + if (write_event(&sele)) + goto err; + + // Start_encryption_log_event is written, enable the encryption + if (crypto.init(sele.crypto_scheme, key_version)) + goto err; + } + } + if (!is_relay_log) { char buf[FN_REFLEN]; @@ -3549,7 +3562,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, */ Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state, 0); - if (gl_ev.write(&log_file)) + if (write_event(&gl_ev)) goto err; /* Output a binlog checkpoint event at the start of the binlog file. */ @@ -3600,7 +3613,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, flush_io_cache(&log_file); mysql_file_sync(log_file.file, MYF(MY_WME)); DBUG_SUICIDE();); - if (ev.write(&log_file)) + if (write_event(&ev)) goto err; bytes_written+= ev.data_written; } @@ -3632,17 +3645,26 @@ bool MYSQL_BIN_LOG::open(const char *log_name, /* Don't set log_pos in event header */ description_event_for_queue->set_artificial_event(); - if (description_event_for_queue->write(&log_file)) + if (write_event(description_event_for_queue)) goto err; bytes_written+= description_event_for_queue->data_written; } if (flush_io_cache(&log_file) || mysql_file_sync(log_file.file, MYF(MY_WME|MY_SYNC_FILESIZE))) goto err; - mysql_mutex_lock(&LOCK_commit_ordered); - strmake_buf(last_commit_pos_file, log_file_name); - last_commit_pos_offset= my_b_tell(&log_file); - mysql_mutex_unlock(&LOCK_commit_ordered); + + my_off_t offset= my_b_tell(&log_file); + + if (!is_relay_log) + { + /* update binlog_end_pos so that it can be read by after sync hook */ + reset_binlog_end_pos(log_file_name, offset); + + mysql_mutex_lock(&LOCK_commit_ordered); + strmake_buf(last_commit_pos_file, log_file_name); + last_commit_pos_offset= offset; + mysql_mutex_unlock(&LOCK_commit_ordered); + } if (write_file_name_to_index_file) { @@ -3687,9 +3709,13 @@ bool MYSQL_BIN_LOG::open(const char *log_name, /* Remove any initial entries with no pending XIDs. */ while ((b= binlog_xid_count_list.head()) && b->xid_count == 0) { + WSREP_XID_LIST_ENTRY("MYSQL_BIN_LOG::open(): Removing xid_list_entry for " + "%s (%lu)", b); my_free(binlog_xid_count_list.get()); } mysql_cond_broadcast(&COND_xid_list); + WSREP_XID_LIST_ENTRY("MYSQL_BIN_LOG::open(): Adding new xid_list_entry for " + "%s (%lu)", new_xid_list_entry); binlog_xid_count_list.push_back(new_xid_list_entry); mysql_mutex_unlock(&LOCK_xid_list); @@ -3750,6 +3776,7 @@ int MYSQL_BIN_LOG::get_current_log(LOG_INFO* linfo) int MYSQL_BIN_LOG::raw_get_current_log(LOG_INFO* linfo) { + mysql_mutex_assert_owner(&LOCK_log); strmake_buf(linfo->log_file_name, log_file_name); linfo->pos = my_b_tell(&log_file); return 0; @@ -3882,7 +3909,10 @@ int MYSQL_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name, error= !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO; break; } - + if (fname[length-1] != '\n') + continue; // Not a log entry + fname[length-1]= 0; // Remove end \n + // extend relative paths and match against full path if (normalize_binlog_name(full_fname, fname, is_relay_log)) { @@ -3893,11 +3923,10 @@ int MYSQL_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name, // if the log entry matches, null string matching anything if (!log_name || - (log_name_len == fname_len-1 && full_fname[log_name_len] == '\n' && + (log_name_len == fname_len && !memcmp(full_fname, full_log_name, log_name_len))) { DBUG_PRINT("info", ("Found log file entry")); - full_fname[fname_len-1]= 0; // remove last \n linfo->index_file_start_offset= offset; linfo->index_file_offset = my_b_tell(&index_file); break; @@ -3982,8 +4011,10 @@ err: The new index file will only contain this file. - @param thd Thread - @param create_new_log 1 if we should start writing to a new log file + @param thd Thread id. This can be zero in case of resetting + relay logs + @param create_new_log 1 if we should start writing to a new log file + @param next_log_number min number of next log file to use, if possible. @note If not called from slave thread, write start event to new log @@ -3994,8 +4025,9 @@ err: 1 error */ -bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log, - rpl_gtid *init_state, uint32 init_state_len) +bool MYSQL_BIN_LOG::reset_logs(THD *thd, bool create_new_log, + rpl_gtid *init_state, uint32 init_state_len, + ulong next_log_number) { LOG_INFO linfo; bool error=0; @@ -4028,9 +4060,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log, mysql_mutex_unlock(&LOCK_xid_list); } - DEBUG_SYNC(thd, "reset_logs_after_set_reset_master_pending"); - if (thd) - ha_reset_logs(thd); + DEBUG_SYNC_C_IF_THD(thd, "reset_logs_after_set_reset_master_pending"); /* We need to get both locks to be sure that no one is trying to write to the index log file. @@ -4045,7 +4075,8 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log, Without binlog, we cannot XA recover prepared-but-not-committed transactions in engines. So force a commit checkpoint first. - Note that we take and immediately release LOCK_commit_ordered. This has + Note that we take and immediately + release LOCK_after_binlog_sync/LOCK_commit_ordered. This has the effect to ensure that any on-going group commit (in trx_group_commit_leader()) has completed before we request the checkpoint, due to the chaining of LOCK_log and LOCK_commit_ordered in that function. @@ -4056,7 +4087,10 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log, commit_ordered() in the engine of some transaction, and then a crash later would leave such transaction not recoverable. */ + + mysql_mutex_lock(&LOCK_after_binlog_sync); mysql_mutex_lock(&LOCK_commit_ordered); + mysql_mutex_unlock(&LOCK_after_binlog_sync); mysql_mutex_unlock(&LOCK_commit_ordered); mark_xids_active(current_binlog_id, 1); @@ -4109,7 +4143,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log, { uint errcode= purge_log_get_error_code(err); sql_print_error("Failed to locate old binlog or relay log files"); - my_message(errcode, ER(errcode), MYF(0)); + my_message(errcode, ER_THD_OR_DEFAULT(thd, errcode), MYF(0)); error= 1; goto err; } @@ -4120,9 +4154,12 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log, { if (my_errno == ENOENT) { - push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN, - ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE), - linfo.log_file_name); + if (thd) + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_LOG_PURGE_NO_FILE, + ER_THD(thd, ER_LOG_PURGE_NO_FILE), + linfo.log_file_name); + sql_print_information("Failed to delete file '%s'", linfo.log_file_name); my_errno= 0; @@ -4130,13 +4167,14 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log, } else { - push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN, - ER_BINLOG_PURGE_FATAL_ERR, - "a problem with deleting %s; " - "consider examining correspondence " - "of your binlog index file " - "to the actual binlog files", - linfo.log_file_name); + if (thd) + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_BINLOG_PURGE_FATAL_ERR, + "a problem with deleting %s; " + "consider examining correspondence " + "of your binlog index file " + "to the actual binlog files", + linfo.log_file_name); error= 1; goto err; } @@ -4159,9 +4197,11 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log, { if (my_errno == ENOENT) { - push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN, - ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE), - index_file_name); + if (thd) + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_LOG_PURGE_NO_FILE, + ER_THD(thd, ER_LOG_PURGE_NO_FILE), + index_file_name); sql_print_information("Failed to delete file '%s'", index_file_name); my_errno= 0; @@ -4169,19 +4209,21 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log, } else { - push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN, - ER_BINLOG_PURGE_FATAL_ERR, - "a problem with deleting %s; " - "consider examining correspondence " - "of your binlog index file " - "to the actual binlog files", - index_file_name); + if (thd) + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_BINLOG_PURGE_FATAL_ERR, + "a problem with deleting %s; " + "consider examining correspondence " + "of your binlog index file " + "to the actual binlog files", + index_file_name); error= 1; goto err; } } if (create_new_log && !open_index_file(index_file_name, 0, FALSE)) - if ((error= open(save_name, log_type, 0, io_cache_type, max_size, 0, FALSE))) + if ((error= open(save_name, log_type, 0, next_log_number, + io_cache_type, max_size, 0, FALSE))) goto err; my_free((void *) save_name); @@ -4208,6 +4250,8 @@ err: if (b->binlog_id == current_binlog_id) break; DBUG_ASSERT(b->xid_count == 0); + WSREP_XID_LIST_ENTRY("MYSQL_BIN_LOG::reset_logs(): Removing " + "xid_list_entry for %s (%lu)", b); my_free(binlog_xid_count_list.get()); } mysql_cond_broadcast(&COND_xid_list); @@ -4325,12 +4369,9 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included) if((error=find_log_pos(&rli->linfo, rli->event_relay_log_name, 0)) || (error=find_next_log(&rli->linfo, 0))) { - char buff[22]; - sql_print_error("next log error: %d offset: %s log: %s included: %d", - error, - llstr(rli->linfo.index_file_offset,buff), - rli->event_relay_log_name, - included); + sql_print_error("next log error: %d offset: %llu log: %s included: %d", + error, rli->linfo.index_file_offset, + rli->event_relay_log_name, included); goto err; } @@ -4372,14 +4413,9 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included) */ if ((errcode= find_log_pos(&rli->linfo, rli->event_relay_log_name, 0))) { - char buff[22]; - if (!error) - error= errcode; - sql_print_error("next log error: %d offset: %s log: %s included: %d", - errcode, - llstr(rli->linfo.index_file_offset,buff), - rli->group_relay_log_name, - included); + sql_print_error("next log error: %d offset: %llu log: %s included: %d", + errcode, rli->linfo.index_file_offset, + rli->group_relay_log_name, included); goto err; } @@ -4655,7 +4691,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space, if (thd) { push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, - ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE), + ER_LOG_PURGE_NO_FILE, ER_THD(thd, ER_LOG_PURGE_NO_FILE), log_info.log_file_name); } sql_print_information("Failed to execute mysql_file_stat on file '%s'", @@ -4713,13 +4749,6 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space, } error= 0; - if (!need_mutex) - { - /* - This is to avoid triggering an error in NDB. - */ - ha_binlog_index_purge_file(current_thd, log_info.log_file_name); - } DBUG_PRINT("info",("purging %s",log_info.log_file_name)); if (!my_delete(log_info.log_file_name, MYF(0))) @@ -4734,7 +4763,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space, if (thd) { push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, - ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE), + ER_LOG_PURGE_NO_FILE, ER_THD(thd, ER_LOG_PURGE_NO_FILE), log_info.log_file_name); } sql_print_information("Failed to delete file '%s'", @@ -4806,7 +4835,6 @@ int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time) LOG_INFO log_info; MY_STAT stat_area; THD *thd= current_thd; - DBUG_ENTER("purge_logs_before_date"); mysql_mutex_lock(&LOCK_index); @@ -4872,24 +4900,24 @@ err: bool -MYSQL_BIN_LOG::can_purge_log(const char *log_file_name) +MYSQL_BIN_LOG::can_purge_log(const char *log_file_name_arg) { xid_count_per_binlog *b; - if (is_active(log_file_name)) + if (is_active(log_file_name_arg)) return false; mysql_mutex_lock(&LOCK_xid_list); { I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list); while ((b= it++) && - 0 != strncmp(log_file_name+dirname_length(log_file_name), + 0 != strncmp(log_file_name_arg+dirname_length(log_file_name_arg), b->binlog_name, b->binlog_name_len)) ; } mysql_mutex_unlock(&LOCK_xid_list); if (b) return false; - return !log_in_use(log_file_name); + return !log_in_use(log_file_name_arg); } #endif /* HAVE_REPLICATION */ @@ -4945,6 +4973,20 @@ void MYSQL_BIN_LOG::make_log_name(char* buf, const char* log_ident) bool MYSQL_BIN_LOG::is_active(const char *log_file_name_arg) { + /** + * there should/must be mysql_mutex_assert_owner(&LOCK_log) here... + * but code violates this! (scary monsters and super creeps!) + * + * example stacktrace: + * #8 MYSQL_BIN_LOG::is_active + * #9 MYSQL_BIN_LOG::can_purge_log + * #10 MYSQL_BIN_LOG::purge_logs + * #11 MYSQL_BIN_LOG::purge_first_log + * #12 next_event + * #13 exec_relay_log_event + * + * I didn't investigate if this is ligit...(i.e if my comment is wrong) + */ return !strcmp(log_file_name, log_file_name_arg); } @@ -4993,8 +5035,7 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) char new_name[FN_REFLEN], *new_name_ptr, *old_name, *file_to_open; uint close_flag; bool delay_close= false; - File old_file; - LINT_INIT(old_file); + File UNINIT_VAR(old_file); DBUG_ENTER("MYSQL_BIN_LOG::new_file_impl"); if (need_lock) @@ -5018,7 +5059,7 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) We have to do this here and not in open as we want to store the new file name in the current binary log file. */ - if ((error= generate_new_name(new_name, name))) + if ((error= generate_new_name(new_name, name, 0))) goto end; new_name_ptr=new_name; @@ -5039,11 +5080,13 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) r.checksum_alg= relay_log_checksum_alg; DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); if(DBUG_EVALUATE_IF("fault_injection_new_file_rotate_event", (error=close_on_error=TRUE), FALSE) || - (error= r.write(&log_file))) + (error= write_event(&r))) { DBUG_EXECUTE_IF("fault_injection_new_file_rotate_event", errno=2;); close_on_error= TRUE; - my_printf_error(ER_ERROR_ON_WRITE, ER(ER_CANT_OPEN_FILE), MYF(ME_FATALERROR), name, errno); + my_printf_error(ER_ERROR_ON_WRITE, + ER_THD_OR_DEFAULT(current_thd, ER_CANT_OPEN_FILE), + MYF(ME_FATALERROR), name, errno); goto end; } bytes_written += r.data_written; @@ -5078,14 +5121,15 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) binlog_checksum_options= checksum_alg_reset; } /* - Note that at this point, log_state != LOG_CLOSED (important for is_open()). + Note that at this point, log_state != LOG_CLOSED + (important for is_open()). */ /* new_file() is only used for rotation (in FLUSH LOGS or because size > max_binlog_size or max_relay_log_size). - If this is a binary log, the Format_description_log_event at the beginning of - the new file should have created=0 (to distinguish with the + If this is a binary log, the Format_description_log_event at the + beginning of the new file should have created=0 (to distinguish with the Format_description_log_event written at server startup, which should trigger temp tables deletion on slaves. */ @@ -5097,14 +5141,15 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) { /* reopen the binary log file. */ file_to_open= new_name_ptr; - error= open(old_name, log_type, new_name_ptr, io_cache_type, + error= open(old_name, log_type, new_name_ptr, 0, io_cache_type, max_size, 1, FALSE); } /* handle reopening errors */ if (error) { - my_printf_error(ER_CANT_OPEN_FILE, ER(ER_CANT_OPEN_FILE), + my_printf_error(ER_CANT_OPEN_FILE, + ER_THD_OR_DEFAULT(current_thd, ER_CANT_OPEN_FILE), MYF(ME_FATALERROR), file_to_open, error); close_on_error= TRUE; } @@ -5149,9 +5194,16 @@ end: DBUG_RETURN(error); } +bool MYSQL_BIN_LOG::write_event(Log_event *ev, IO_CACHE *file) +{ + Log_event_writer writer(file, &crypto); + if (crypto.scheme && file == &log_file) + writer.ctx= alloca(crypto.ctx_size); -bool -MYSQL_BIN_LOG::append(Log_event *ev) + return writer.write(ev); +} + +bool MYSQL_BIN_LOG::append(Log_event *ev) { bool res; mysql_mutex_lock(&LOCK_log); @@ -5168,11 +5220,8 @@ bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev) mysql_mutex_assert_owner(&LOCK_log); DBUG_ASSERT(log_file.type == SEQ_READ_APPEND); - /* - Log_event::write() is smart enough to use my_b_write() or - my_b_append() depending on the kind of cache we have. - */ - if (ev->write(&log_file)) + + if (write_event(ev)) { error=1; goto err; @@ -5188,32 +5237,62 @@ err: DBUG_RETURN(error); } - -bool MYSQL_BIN_LOG::appendv(const char* buf, uint len,...) +bool MYSQL_BIN_LOG::write_event_buffer(uchar* buf, uint len) { - bool error= 0; - DBUG_ENTER("MYSQL_BIN_LOG::appendv"); - va_list(args); - va_start(args,len); + bool error= 1; + uchar *ebuf= 0; + DBUG_ENTER("MYSQL_BIN_LOG::write_event_buffer"); DBUG_ASSERT(log_file.type == SEQ_READ_APPEND); mysql_mutex_assert_owner(&LOCK_log); - do + + if (crypto.scheme != 0) { - if (my_b_append(&log_file,(uchar*) buf,len)) - { - error= 1; + DBUG_ASSERT(crypto.scheme == 1); + + uint elen; + uchar iv[BINLOG_IV_LENGTH]; + + ebuf= (uchar*)my_safe_alloca(len); + if (!ebuf) goto err; - } - bytes_written += len; - } while ((buf=va_arg(args,const char*)) && (len=va_arg(args,uint))); + + crypto.set_iv(iv, my_b_append_tell(&log_file)); + + /* + we want to encrypt everything, excluding the event length: + massage the data before the encryption + */ + memcpy(buf + EVENT_LEN_OFFSET, buf, 4); + + if (encryption_crypt(buf + 4, len - 4, + ebuf + 4, &elen, + crypto.key, crypto.key_length, iv, sizeof(iv), + ENCRYPTION_FLAG_ENCRYPT | ENCRYPTION_FLAG_NOPAD, + ENCRYPTION_KEY_SYSTEM_DATA, crypto.key_version)) + goto err; + + DBUG_ASSERT(elen == len - 4); + + /* massage the data after the encryption */ + memcpy(ebuf, ebuf + EVENT_LEN_OFFSET, 4); + int4store(ebuf + EVENT_LEN_OFFSET, len); + + buf= ebuf; + } + if (my_b_append(&log_file, buf, len)) + goto err; + bytes_written+= len; + + error= 0; DBUG_PRINT("info",("max_size: %lu",max_size)); if (flush_and_sync(0)) goto err; if (my_b_append_tell(&log_file) > max_size) error= new_file_without_locking(); err: + my_safe_afree(ebuf, len); if (!error) signal_update(); DBUG_RETURN(error); @@ -5424,7 +5503,6 @@ binlog_cache_mngr *THD::binlog_setup_trx_data() DBUG_RETURN(cache_mngr); } - /* Function to start a statement and optionally a transaction for the binary log. @@ -5508,6 +5586,7 @@ binlog_start_consistent_snapshot(handlerton *hton, THD *thd) binlog_cache_mngr *const cache_mngr= thd->binlog_setup_trx_data(); /* Server layer calls us with LOCK_commit_ordered locked, so this is safe. */ + mysql_mutex_assert_owner(&LOCK_commit_ordered); strmake_buf(cache_mngr->last_commit_pos_file, mysql_bin_log.last_commit_pos_file); cache_mngr->last_commit_pos_offset= mysql_bin_log.last_commit_pos_offset; @@ -5545,12 +5624,8 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional, is_transactional= 1; /* Pre-conditions */ -#ifdef WITH_WSREP - DBUG_ASSERT(is_current_stmt_binlog_format_row() && - (WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open())); -#else - DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); -#endif + DBUG_ASSERT(is_current_stmt_binlog_format_row()); + DBUG_ASSERT(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open()); DBUG_ASSERT(table->s->table_map_id != ULONG_MAX); Table_map_log_event @@ -5564,6 +5639,7 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional, IO_CACHE *file= cache_mngr->get_binlog_cache_log(use_trans_cache(this, is_transactional)); + Log_event_writer writer(file); binlog_cache_data *cache_data= cache_mngr->get_binlog_cache_data(use_trans_cache(this, is_transactional)); @@ -5572,14 +5648,14 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional, Annotate_rows_log_event anno(table->in_use, is_transactional, false); /* Annotate event should be written not more than once */ *with_annotate= 0; - if ((error= anno.write(file))) + if ((error= writer.write(&anno))) { if (my_errno == EFBIG) cache_data->set_incident(); DBUG_RETURN(error); } } - if ((error= the_event.write(file))) + if ((error= writer.write(&the_event))) DBUG_RETURN(error); binlog_table_maps++; @@ -5690,11 +5766,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, bool is_transactional) { DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)"); -#ifdef WITH_WSREP DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()); -#else - DBUG_ASSERT(mysql_bin_log.is_open()); -#endif DBUG_PRINT("enter", ("event: 0x%lx", (long) event)); int error= 0; @@ -5710,14 +5782,14 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, if (Rows_log_event* pending= cache_data->pending()) { - IO_CACHE *file= &cache_data->cache_log; + Log_event_writer writer(&cache_data->cache_log); /* Write pending event to the cache. */ DBUG_EXECUTE_IF("simulate_disk_full_at_flush_pending", {DBUG_SET("+d,simulate_file_write_error");}); - if (pending->write(file)) + if (writer.write(pending)) { set_write_error(thd, is_transactional); if (check_write_error(thd) && cache_data && @@ -5746,13 +5818,26 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, bool is_transactional, uint64 commit_id) { rpl_gtid gtid; - uint32 domain_id= thd->variables.gtid_domain_id; - uint32 server_id= thd->variables.server_id; - uint64 seq_no= thd->variables.gtid_seq_no; + uint32 domain_id; + uint32 local_server_id; + uint64 seq_no; int err; DBUG_ENTER("write_gtid_event"); DBUG_PRINT("enter", ("standalone: %d", standalone)); - + +#ifdef WITH_WSREP + if (WSREP(thd) && thd->wsrep_trx_meta.gtid.seqno != -1 && wsrep_gtid_mode) + { + domain_id= wsrep_gtid_domain_id; + } else { +#endif /* WITH_WSREP */ + domain_id= thd->variables.gtid_domain_id; +#ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ + local_server_id= thd->variables.server_id; + seq_no= thd->variables.gtid_seq_no; + if (thd->variables.option_bits & OPTION_GTID_BEGIN) { DBUG_PRINT("error", ("OPTION_GTID_BEGIN is set. " @@ -5770,7 +5855,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, { /* Use the specified sequence number. */ gtid.domain_id= domain_id; - gtid.server_id= server_id; + gtid.server_id= local_server_id; gtid.seq_no= seq_no; err= rpl_global_gtid_binlog_state.update(>id, opt_gtid_strict_mode); if (err && thd->get_stmt_da()->sql_errno()==ER_GTID_STRICT_OUT_OF_ORDER) @@ -5780,7 +5865,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, { /* Allocate the next sequence number for the GTID. */ err= rpl_global_gtid_binlog_state.update_with_next_gtid(domain_id, - server_id, >id); + local_server_id, >id); seq_no= gtid.seq_no; } if (err) @@ -5792,7 +5877,8 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, commit_id); /* Write the event to the binary log. */ - if (gtid_event.write(&mysql_bin_log.log_file)) + DBUG_ASSERT(this == &mysql_bin_log); + if (write_event(>id_event)) DBUG_RETURN(true); status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written); @@ -5808,7 +5894,7 @@ MYSQL_BIN_LOG::write_state_to_file() char buf[FN_REFLEN]; int err; bool opened= false; - bool inited= false; + bool log_inited= false; fn_format(buf, opt_bin_logname, mysql_data_home, ".state", MY_UNPACK_FILENAME); @@ -5823,10 +5909,10 @@ MYSQL_BIN_LOG::write_state_to_file() if ((err= init_io_cache(&cache, file_no, IO_SIZE, WRITE_CACHE, 0, 0, MYF(MY_WME|MY_WAIT_IF_FULL)))) goto err; - inited= true; + log_inited= true; if ((err= rpl_global_gtid_binlog_state.write_to_iocache(&cache))) goto err; - inited= false; + log_inited= false; if ((err= end_io_cache(&cache))) goto err; if ((err= mysql_file_sync(file_no, MYF(MY_WME|MY_SYNC_FILESIZE)))) @@ -5835,7 +5921,7 @@ MYSQL_BIN_LOG::write_state_to_file() err: sql_print_error("Error writing binlog state to file '%s'.\n", buf); - if (inited) + if (log_inited) end_io_cache(&cache); end: if (opened) @@ -5861,7 +5947,7 @@ MYSQL_BIN_LOG::read_state_from_file() char buf[FN_REFLEN]; int err; bool opened= false; - bool inited= false; + bool log_inited= false; fn_format(buf, opt_bin_logname, mysql_data_home, ".state", MY_UNPACK_FILENAME); @@ -5888,7 +5974,7 @@ MYSQL_BIN_LOG::read_state_from_file() if ((err= init_io_cache(&cache, file_no, IO_SIZE, READ_CACHE, 0, 0, MYF(MY_WME|MY_WAIT_IF_FULL)))) goto err; - inited= true; + log_inited= true; if ((err= rpl_global_gtid_binlog_state.read_from_iocache(&cache))) goto err; goto end; @@ -5896,7 +5982,7 @@ MYSQL_BIN_LOG::read_state_from_file() err: sql_print_error("Error reading binlog GTID state from file '%s'.\n", buf); end: - if (inited) + if (log_inited) end_io_cache(&cache); if (opened) mysql_file_close(file_no, MYF(0)); @@ -5934,11 +6020,11 @@ MYSQL_BIN_LOG::is_empty_state() bool -MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id, +MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id_arg, rpl_gtid *out_gtid) { rpl_gtid *gtid; - if ((gtid= rpl_global_gtid_binlog_state.find(domain_id, server_id))) + if ((gtid= rpl_global_gtid_binlog_state.find(domain_id, server_id_arg))) *out_gtid= *gtid; return gtid != NULL; } @@ -5968,11 +6054,13 @@ MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint32 domain_id, uint64 seq_no) bool -MYSQL_BIN_LOG::check_strict_gtid_sequence(uint32 domain_id, uint32 server_id, +MYSQL_BIN_LOG::check_strict_gtid_sequence(uint32 domain_id, + uint32 server_id_arg, uint64 seq_no) { return rpl_global_gtid_binlog_state.check_strict_sequence(domain_id, - server_id, seq_no); + server_id_arg, + seq_no); } @@ -5989,22 +6077,19 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) binlog_cache_data *cache_data= 0; bool is_trans_cache= FALSE; bool using_trans= event_info->use_trans_cache(); - bool direct; - ulong prev_binlog_id; + bool direct= event_info->use_direct_logging(); + ulong UNINIT_VAR(prev_binlog_id); DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)"); - LINT_INIT(prev_binlog_id); -#ifdef WITH_WSREP /* When binary logging is not enabled (--log-bin=0), wsrep-patch partially - enables it without opening the binlog file (MSQL_BIN_LOG::open(). - So, avoid writing directly to binlog file. + enables it without opening the binlog file (MYSQL_BIN_LOG::open(). + So, avoid writing to binlog file. */ - if (wsrep_emulate_bin_log) - direct= false; - else -#endif /* WITH_WSREP */ - direct= event_info->use_direct_logging(); + if (direct && + (wsrep_emulate_bin_log || + (WSREP(thd) && !(thd->variables.option_bits & OPTION_BIN_LOG)))) + DBUG_RETURN(0); if (thd->variables.option_bits & OPTION_GTID_BEGIN) { @@ -6028,10 +6113,17 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) /* We only end the statement if we are in a top-level statement. If we are inside a stored function, we do not end the statement since - this will close all tables on the slave. + this will close all tables on the slave. But there can be a special case + where we are inside a stored function/trigger and a SAVEPOINT is being + set in side the stored function/trigger. This SAVEPOINT execution will + force the pending event to be flushed without an STMT_END_F flag. This + will result in a case where following DMLs will be considered as part of + same statement and result in data loss on slave. Hence in this case we + force the end_stmt to be true. */ - bool const end_stmt= - thd->locked_tables_mode && thd->lex->requires_prelocking(); + bool const end_stmt= (thd->in_sub_stmt && thd->lex->sql_command == + SQLCOM_SAVEPOINT) ? true : + (thd->locked_tables_mode && thd->lex->requires_prelocking()); if (thd->binlog_flush_pending_rows_event(end_stmt, using_trans)) DBUG_RETURN(error); @@ -6040,13 +6132,9 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) mostly called if is_open() *was* true a few instructions before, but it could have changed since. */ -#ifdef WITH_WSREP /* applier and replayer can skip writing binlog events */ - if ((WSREP_EMULATE_BINLOG(thd) && (thd->wsrep_exec_mode != REPL_RECV)) || - is_open()) -#else - if (likely(is_open())) -#endif + if ((WSREP_EMULATE_BINLOG(thd) && + IF_WSREP(thd->wsrep_exec_mode != REPL_RECV, 0)) || is_open()) { my_off_t UNINIT_VAR(my_org_b_tell); #ifdef HAVE_REPLICATION @@ -6056,7 +6144,17 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) binlog_[wild_]{do|ignore}_table?" (WL#1049)" */ const char *local_db= event_info->get_db(); - if ((!(thd->variables.option_bits & OPTION_BIN_LOG)) || + + bool option_bin_log_flag= (thd->variables.option_bits & OPTION_BIN_LOG); + + /* + Log all updates to binlog cache so that they can get replicated to other + nodes. A check has been added to stop them from getting logged into + binary log files. + */ + if (WSREP(thd)) option_bin_log_flag= true; + + if ((!(option_bin_log_flag)) || (thd->lex->sql_command != SQLCOM_ROLLBACK_TO_SAVEPOINT && thd->lex->sql_command != SQLCOM_SAVEPOINT && !binlog_filter->db_ok(local_db))) @@ -6078,11 +6176,12 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) prev_binlog_id= current_binlog_id; DBUG_EXECUTE_IF("binlog_force_commit_id", { - const LEX_STRING name= { C_STRING_WITH_LEN("commit_id") }; + const LEX_STRING commit_name= { C_STRING_WITH_LEN("commit_id") }; bool null_value; user_var_entry *entry= (user_var_entry*) my_hash_search(&thd->user_vars, - (uchar*) name.str, name.length); + (uchar*) commit_name.str, + commit_name.length); commit_id= entry->val_int(&null_value); }); if (write_gtid_event(thd, true, using_trans, commit_id)) @@ -6120,7 +6219,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) Annotate_rows_log_event anno(thd, using_trans, direct); /* Annotate event should be written not more than once */ *with_annotate= 0; - if (anno.write(file)) + if (write_event(&anno, file)) goto err; } @@ -6134,7 +6233,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT, thd->first_successful_insert_id_in_prev_stmt_for_binlog, using_trans, direct); - if (e.write(file)) + if (write_event(&e, file)) goto err; } if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0) @@ -6145,14 +6244,14 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) Intvar_log_event e(thd, (uchar) INSERT_ID_EVENT, thd->auto_inc_intervals_in_cur_stmt_for_binlog. minimum(), using_trans, direct); - if (e.write(file)) + if (write_event(&e, file)) goto err; } if (thd->rand_used) { Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2, using_trans, direct); - if (e.write(file)) + if (write_event(&e, file)) goto err; } if (thd->user_var_events.elements) @@ -6176,7 +6275,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) flags, using_trans, direct); - if (e.write(file)) + if (write_event(&e, file)) goto err; } } @@ -6186,7 +6285,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) /* Write the event. */ - if (event_info->write(file) || + if (write_event(event_info, file) || DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0)) goto err; @@ -6204,30 +6303,66 @@ err: if ((error= flush_and_sync(&synced))) { } - else if ((error= RUN_HOOK(binlog_storage, after_flush, - (thd, log_file_name, file->pos_in_file, synced)))) - { - sql_print_error("Failed to run 'after_flush' hooks"); - } else { - signal_update(); - if ((error= rotate(false, &check_purge))) - check_purge= false; + mysql_mutex_assert_not_owner(&LOCK_prepare_ordered); + mysql_mutex_assert_owner(&LOCK_log); + mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync); + mysql_mutex_assert_not_owner(&LOCK_commit_ordered); + bool first= true; + bool last= true; + if ((error= RUN_HOOK(binlog_storage, after_flush, + (thd, log_file_name, file->pos_in_file, + synced, first, last)))) + { + sql_print_error("Failed to run 'after_flush' hooks"); + error= 1; + } + else + { + /* update binlog_end_pos so it can be read by dump thread + * + * note: must be _after_ the RUN_HOOK(after_flush) or else + * semi-sync-plugin might not have put the transaction into + * it's list before dump-thread tries to send it + */ + update_binlog_end_pos(offset); + + signal_update(); + if ((error= rotate(false, &check_purge))) + check_purge= false; + } } } status_var_add(thd->status_var.binlog_bytes_written, offset - my_org_b_tell); + mysql_mutex_lock(&LOCK_after_binlog_sync); + mysql_mutex_unlock(&LOCK_log); + + mysql_mutex_assert_not_owner(&LOCK_prepare_ordered); + mysql_mutex_assert_not_owner(&LOCK_log); + mysql_mutex_assert_owner(&LOCK_after_binlog_sync); + mysql_mutex_assert_not_owner(&LOCK_commit_ordered); + bool first= true; + bool last= true; + if (RUN_HOOK(binlog_storage, after_sync, + (thd, log_file_name, file->pos_in_file, + first, last))) + { + error=1; + /* error is already printed inside hook */ + } + /* Take mutex to protect against a reader seeing partial writes of 64-bit offset on 32-bit CPUs. */ mysql_mutex_lock(&LOCK_commit_ordered); + mysql_mutex_unlock(&LOCK_after_binlog_sync); last_commit_pos_offset= offset; mysql_mutex_unlock(&LOCK_commit_ordered); - mysql_mutex_unlock(&LOCK_log); if (check_purge) checkpoint_and_purge(prev_binlog_id); @@ -6397,15 +6532,15 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge) { int error= 0; DBUG_ENTER("MYSQL_BIN_LOG::rotate"); -#ifdef WITH_WSREP - if (WSREP_ON && wsrep_to_isolation) - { - *check_purge= false; - WSREP_DEBUG("avoiding binlog rotate due to TO isolation: %d", - wsrep_to_isolation); - DBUG_RETURN(0); - } -#endif + + if (wsrep_to_isolation) + { + DBUG_ASSERT(WSREP_ON); + *check_purge= false; + WSREP_DEBUG("avoiding binlog rotate due to TO isolation: %d", + wsrep_to_isolation); + DBUG_RETURN(0); + } //todo: fix the macro def and restore safe_mutex_assert_owner(&LOCK_log); *check_purge= false; @@ -6529,33 +6664,35 @@ uint MYSQL_BIN_LOG::next_file_id() return res; } +class CacheWriter: public Log_event_writer +{ +public: + ulong remains; -/** - Calculate checksum of possibly a part of an event containing at least - the whole common header. - - @param buf the pointer to trans cache's buffer - @param off the offset of the beginning of the event in the buffer - @param event_len no-checksum length of the event - @param length the current size of the buffer - - @param crc [in-out] the checksum + CacheWriter(THD *thd_arg, IO_CACHE *file_arg, bool do_checksum, + Binlog_crypt_data *cr) + : Log_event_writer(file_arg, cr), remains(0), thd(thd_arg), first(true) + { checksum_len= do_checksum ? BINLOG_CHECKSUM_LEN : 0; } - Event size in incremented by @c BINLOG_CHECKSUM_LEN. + ~CacheWriter() + { status_var_add(thd->status_var.binlog_bytes_written, bytes_written); } - @return 0 or number of unprocessed yet bytes of the event excluding - the checksum part. -*/ - static ulong fix_log_event_crc(uchar *buf, uint off, uint event_len, - uint length, ha_checksum *crc) -{ - ulong ret; - uchar *event_begin= buf + off; + int write(uchar* pos, size_t len) + { + if (first) + write_header(pos, len); + else + write_data(pos, len); - ret= length >= off + event_len ? 0 : off + event_len - length; - *crc= my_checksum(*crc, event_begin, event_len - ret); - return ret; -} + remains -= len; + if ((first= !remains)) + write_footer(); + return 0; + } +private: + THD *thd; + bool first; +}; /* Write the contents of a cache to the binary log. @@ -6576,21 +6713,22 @@ uint MYSQL_BIN_LOG::next_file_id() int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) { + DBUG_ENTER("MYSQL_BIN_LOG::write_cache"); + mysql_mutex_assert_owner(&LOCK_log); if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) - return ER_ERROR_ON_WRITE; + DBUG_RETURN(ER_ERROR_ON_WRITE); uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs; - ulong remains= 0; // part of unprocessed yet netto length of the event long val; ulong end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t uchar header[LOG_EVENT_HEADER_LEN]; - ha_checksum crc= 0, crc_0= 0; - my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF); - uchar buf[BINLOG_CHECKSUM_LEN]; - DBUG_ENTER("MYSQL_BIN_LOG::write_cache"); + CacheWriter writer(thd, &log_file, binlog_checksum_options, &crypto); + + if (crypto.scheme) + writer.ctx= alloca(crypto.ctx_size); // while there is just one alg the following must hold: - DBUG_ASSERT(!do_checksum || + DBUG_ASSERT(binlog_checksum_options == BINLOG_CHECKSUM_ALG_OFF || binlog_checksum_options == BINLOG_CHECKSUM_ALG_CRC32); /* @@ -6609,7 +6747,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) group= (uint)my_b_tell(&log_file); hdr_offs= carry= 0; - + do { /* @@ -6619,53 +6757,40 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) if (unlikely(carry > 0)) { DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN); + uint tail= LOG_EVENT_HEADER_LEN - carry; /* assemble both halves */ - memcpy(&header[carry], (char *)cache->read_pos, - LOG_EVENT_HEADER_LEN - carry); + memcpy(&header[carry], (char *)cache->read_pos, tail); + + ulong len= uint4korr(header + EVENT_LEN_OFFSET); + writer.remains= len; /* fix end_log_pos */ - val= uint4korr(&header[LOG_POS_OFFSET]) + group + - (end_log_pos_inc+= (do_checksum ? BINLOG_CHECKSUM_LEN : 0)); - int4store(&header[LOG_POS_OFFSET], val); + end_log_pos_inc += writer.checksum_len; + val= uint4korr(header + LOG_POS_OFFSET) + group + end_log_pos_inc; + int4store(header + LOG_POS_OFFSET, val); - if (do_checksum) - { - ulong len= uint4korr(&header[EVENT_LEN_OFFSET]); - /* fix len */ - int4store(&header[EVENT_LEN_OFFSET], len + BINLOG_CHECKSUM_LEN); - } + /* fix len */ + len+= writer.checksum_len; + int4store(header + EVENT_LEN_OFFSET, len); - /* write the first half of the split header */ - if (my_b_write(&log_file, header, carry)) + if (writer.write(header, LOG_EVENT_HEADER_LEN)) DBUG_RETURN(ER_ERROR_ON_WRITE); - status_var_add(thd->status_var.binlog_bytes_written, carry); - /* - copy fixed second half of header to cache so the correct - version will be written later. - */ - memcpy((char *)cache->read_pos, &header[carry], - LOG_EVENT_HEADER_LEN - carry); + cache->read_pos+= tail; + length-= tail; + carry= 0; /* next event header at ... */ - hdr_offs= uint4korr(&header[EVENT_LEN_OFFSET]) - carry - - (do_checksum ? BINLOG_CHECKSUM_LEN : 0); - - if (do_checksum) - { - DBUG_ASSERT(crc == crc_0 && remains == 0); - crc= my_checksum(crc, header, carry); - remains= uint4korr(header + EVENT_LEN_OFFSET) - carry - - BINLOG_CHECKSUM_LEN; - } - carry= 0; + hdr_offs= len - LOG_EVENT_HEADER_LEN - writer.checksum_len; } /* if there is anything to write, process it. */ if (likely(length > 0)) { + DBUG_EXECUTE_IF("fail_binlog_write_1", + errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE);); /* process all event-headers in this (partial) cache. if next header is beyond current read-buffer, @@ -6673,52 +6798,28 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) very next iteration, just "eventually"). */ - /* crc-calc the whole buffer */ - if (do_checksum && hdr_offs >= length) + if (hdr_offs >= length) { - - DBUG_ASSERT(remains != 0 && crc != crc_0); - - crc= my_checksum(crc, cache->read_pos, length); - remains -= length; - if (my_b_write(&log_file, cache->read_pos, length)) + if (writer.write(cache->read_pos, length)) DBUG_RETURN(ER_ERROR_ON_WRITE); - if (remains == 0) - { - int4store(buf, crc); - if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN)) - DBUG_RETURN(ER_ERROR_ON_WRITE); - crc= crc_0; - } } while (hdr_offs < length) { /* - partial header only? save what we can get, process once - we get the rest. + finish off with remains of the last event that crawls + from previous into the current buffer */ - - if (do_checksum) + if (writer.remains != 0) { - if (remains != 0) - { - /* - finish off with remains of the last event that crawls - from previous into the current buffer - */ - DBUG_ASSERT(crc != crc_0); - crc= my_checksum(crc, cache->read_pos, hdr_offs); - int4store(buf, crc); - remains -= hdr_offs; - DBUG_ASSERT(remains == 0); - if (my_b_write(&log_file, cache->read_pos, hdr_offs) || - my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN)) - DBUG_RETURN(ER_ERROR_ON_WRITE); - crc= crc_0; - } + if (writer.write(cache->read_pos, hdr_offs)) + DBUG_RETURN(ER_ERROR_ON_WRITE); } + /* + partial header only? save what we can get, process once + we get the rest. + */ if (hdr_offs + LOG_EVENT_HEADER_LEN > length) { carry= length - hdr_offs; @@ -6729,37 +6830,25 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) { /* we've got a full event-header, and it came in one piece */ uchar *ev= (uchar *)cache->read_pos + hdr_offs; - uint event_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len + uint ev_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len uchar *log_pos= ev + LOG_POS_OFFSET; + end_log_pos_inc += writer.checksum_len; /* fix end_log_pos */ - val= uint4korr(log_pos) + group + - (end_log_pos_inc += (do_checksum ? BINLOG_CHECKSUM_LEN : 0)); + val= uint4korr(log_pos) + group + end_log_pos_inc; int4store(log_pos, val); - /* fix CRC */ - if (do_checksum) - { - /* fix length */ - int4store(ev + EVENT_LEN_OFFSET, event_len + BINLOG_CHECKSUM_LEN); - remains= fix_log_event_crc(cache->read_pos, hdr_offs, event_len, - length, &crc); - if (my_b_write(&log_file, ev, - remains == 0 ? event_len : length - hdr_offs)) - DBUG_RETURN(ER_ERROR_ON_WRITE); - if (remains == 0) - { - int4store(buf, crc); - if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN)) - DBUG_RETURN(ER_ERROR_ON_WRITE); - crc= crc_0; // crc is complete - } - } + /* fix length */ + int4store(ev + EVENT_LEN_OFFSET, ev_len + writer.checksum_len); + + writer.remains= ev_len; + if (writer.write(ev, std::min<uint>(ev_len, length - hdr_offs))) + DBUG_RETURN(ER_ERROR_ON_WRITE); /* next event header at ... */ - hdr_offs += event_len; // incr by the netto len + hdr_offs += ev_len; // incr by the netto len - DBUG_ASSERT(!do_checksum || remains == 0 || hdr_offs >= length); + DBUG_ASSERT(!writer.checksum_len || writer.remains == 0 || hdr_offs >= length); } } @@ -6773,21 +6862,10 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) */ hdr_offs -= length; } - - /* Write data to the binary log file */ - DBUG_EXECUTE_IF("fail_binlog_write_1", - errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE);); - if (!do_checksum) - if (my_b_write(&log_file, cache->read_pos, length)) - DBUG_RETURN(ER_ERROR_ON_WRITE); - status_var_add(thd->status_var.binlog_bytes_written, length); - - cache->read_pos=cache->read_end; // Mark buffer used up } while ((length= my_b_fill(cache))); DBUG_ASSERT(carry == 0); - DBUG_ASSERT(!do_checksum || remains == 0); - DBUG_ASSERT(!do_checksum || crc == crc_0); + DBUG_ASSERT(!writer.checksum_len || writer.remains == 0); DBUG_RETURN(0); // All OK } @@ -6832,7 +6910,7 @@ bool MYSQL_BIN_LOG::write_incident_already_locked(THD *thd) if (likely(is_open())) { - error= ev.write(&log_file); + error= write_event(&ev); status_var_add(thd->status_var.binlog_bytes_written, ev.data_written); } @@ -6861,6 +6939,9 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) } offset= my_b_tell(&log_file); + + update_binlog_end_pos(offset); + /* Take mutex to protect against a reader seeing partial writes of 64-bit offset on 32-bit CPUs. @@ -6882,17 +6963,16 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) } void -MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name, - uint len) +MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name_arg, uint len) { my_off_t offset; - Binlog_checkpoint_log_event ev(name, len); + Binlog_checkpoint_log_event ev(name_arg, len); /* Note that we must sync the binlog checkpoint to disk. Otherwise a subsequent log purge could delete binlogs that XA recovery thinks are needed (even though they are not really). */ - if (!ev.write(&log_file) && !flush_and_sync(0)) + if (!write_event(&ev) && !flush_and_sync(0)) { signal_update(); } @@ -6910,6 +6990,9 @@ MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name, } offset= my_b_tell(&log_file); + + update_binlog_end_pos(offset); + /* Take mutex to protect against a reader seeing partial writes of 64-bit offset on 32-bit CPUs. @@ -6955,13 +7038,14 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, Ha_trx_info *ha_info; DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog"); -#ifdef WITH_WSREP /* Control should not be allowed beyond this point in wsrep_emulate_bin_log - mode. + mode. Also, do not write the cached updates to binlog if binary logging is + disabled (log-bin/sql_log_bin). */ - if (wsrep_emulate_bin_log) DBUG_RETURN(0); -#endif /* WITH_WSREP */ + if (wsrep_emulate_bin_log || !(thd->variables.option_bits & OPTION_BIN_LOG)) + DBUG_RETURN(0); + entry.thd= thd; entry.cache_mngr= cache_mngr; entry.error= 0; @@ -7119,7 +7203,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) wfc->wakeup_error= orig_entry->thd->killed_errno(); if (!wfc->wakeup_error) wfc->wakeup_error= ER_QUERY_INTERRUPTED; - my_message(wfc->wakeup_error, ER(wfc->wakeup_error), MYF(0)); + my_message(wfc->wakeup_error, + ER_THD(orig_entry->thd, wfc->wakeup_error), MYF(0)); DBUG_RETURN(-1); } } @@ -7432,10 +7517,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) group_commit_entry *current, *last_in_queue; group_commit_entry *queue= NULL; bool check_purge= false; - ulong binlog_id; + ulong UNINIT_VAR(binlog_id); uint64 commit_id; DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader"); - LINT_INIT(binlog_id); { DBUG_EXECUTE_IF("inject_binlog_commit_before_get_LOCK_log", @@ -7487,11 +7571,12 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) commit_id= (last_in_queue == leader ? 0 : (uint64)leader->thd->query_id); DBUG_EXECUTE_IF("binlog_force_commit_id", { - const LEX_STRING name= { C_STRING_WITH_LEN("commit_id") }; + const LEX_STRING commit_name= { C_STRING_WITH_LEN("commit_id") }; bool null_value; user_var_entry *entry= (user_var_entry*) my_hash_search(&leader->thd->user_vars, - (uchar*) name.str, name.length); + (uchar*) commit_name.str, + commit_name.length); commit_id= entry->val_int(&null_value); }); /* @@ -7558,12 +7643,21 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) { bool any_error= false; bool all_error= true; + + mysql_mutex_assert_not_owner(&LOCK_prepare_ordered); + mysql_mutex_assert_owner(&LOCK_log); + mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync); + mysql_mutex_assert_not_owner(&LOCK_commit_ordered); + bool first= true, last; for (current= queue; current != NULL; current= current->next) { + last= current->next == NULL; if (!current->error && RUN_HOOK(binlog_storage, after_flush, - (current->thd, log_file_name, - current->cache_mngr->last_commit_pos_offset, synced))) + (current->thd, + current->cache_mngr->last_commit_pos_file, + current->cache_mngr->last_commit_pos_offset, synced, + first, last))) { current->error= ER_ERROR_ON_WRITE; current->commit_errno= -1; @@ -7572,8 +7666,17 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) } else all_error= false; + first= false; } + /* update binlog_end_pos so it can be read by dump thread + * + * note: must be _after_ the RUN_HOOK(after_flush) or else + * semi-sync-plugin might not have put the transaction into + * it's list before dump-thread tries to send it + */ + update_binlog_end_pos(commit_offset); + if (any_error) sql_print_error("Failed to run 'after_flush' hooks"); if (!all_error) @@ -7614,18 +7717,54 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) commit_offset= my_b_write_tell(&log_file); } - DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered"); - mysql_mutex_lock(&LOCK_commit_ordered); - last_commit_pos_offset= commit_offset; + DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_after_binlog_sync"); + mysql_mutex_lock(&LOCK_after_binlog_sync); /* - We cannot unlock LOCK_log until we have locked LOCK_commit_ordered; + We cannot unlock LOCK_log until we have locked LOCK_after_binlog_sync; otherwise scheduling could allow the next group commit to run ahead of us, messing up the order of commit_ordered() calls. But as soon as - LOCK_commit_ordered is obtained, we can let the next group commit start. + LOCK_after_binlog_sync is obtained, we can let the next group commit start. */ mysql_mutex_unlock(&LOCK_log); DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log"); + + /* + Loop through threads and run the binlog_sync hook + */ + { + mysql_mutex_assert_not_owner(&LOCK_prepare_ordered); + mysql_mutex_assert_not_owner(&LOCK_log); + mysql_mutex_assert_owner(&LOCK_after_binlog_sync); + mysql_mutex_assert_not_owner(&LOCK_commit_ordered); + + bool first= true, last; + for (current= queue; current != NULL; current= current->next) + { + last= current->next == NULL; + if (!current->error && + RUN_HOOK(binlog_storage, after_sync, + (current->thd, current->cache_mngr->last_commit_pos_file, + current->cache_mngr->last_commit_pos_offset, + first, last))) + { + /* error is already printed inside hook */ + } + first= false; + } + } + + DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered"); + mysql_mutex_lock(&LOCK_commit_ordered); + last_commit_pos_offset= commit_offset; + + /* + Unlock LOCK_after_binlog_sync only *after* LOCK_commit_ordered has been + acquired so that groups can not reorder for the different stages of + the group commit procedure. + */ + mysql_mutex_unlock(&LOCK_after_binlog_sync); + DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_after_binlog_sync"); ++num_group_commits; if (!opt_optimize_thread_scheduling) @@ -7741,7 +7880,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, DBUG_RETURN(ER_ERROR_ON_WRITE); }); - if (entry->end_event->write(&log_file)) + if (write_event(entry->end_event)) { entry->error_cache= NULL; DBUG_RETURN(ER_ERROR_ON_WRITE); @@ -7751,7 +7890,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, if (entry->incident_event) { - if (entry->incident_event->write(&log_file)) + if (write_event(entry->incident_event)) { entry->error_cache= NULL; DBUG_RETURN(ER_ERROR_ON_WRITE); @@ -7920,6 +8059,7 @@ void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd) PSI_stage_info old_stage; DBUG_ENTER("wait_for_update_relay_log"); + mysql_mutex_assert_owner(&LOCK_log); thd->ENTER_COND(&update_cond, &LOCK_log, &stage_slave_has_read_all_relay_log, &old_stage); @@ -7951,6 +8091,7 @@ int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd, DBUG_ENTER("wait_for_update_bin_log"); thd_wait_begin(thd, THD_WAIT_BINLOG); + mysql_mutex_assert_owner(&LOCK_log); if (!timeout) mysql_cond_wait(&update_cond, &LOCK_log); else @@ -7960,6 +8101,23 @@ int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd, DBUG_RETURN(ret); } +int MYSQL_BIN_LOG::wait_for_update_binlog_end_pos(THD* thd, + struct timespec *timeout) +{ + int ret= 0; + DBUG_ENTER("wait_for_update_binlog_end_pos"); + + thd_wait_begin(thd, THD_WAIT_BINLOG); + mysql_mutex_assert_owner(get_binlog_end_pos_lock()); + if (!timeout) + mysql_cond_wait(&update_cond, get_binlog_end_pos_lock()); + else + ret= mysql_cond_timedwait(&update_cond, get_binlog_end_pos_lock(), + timeout); + thd_wait_end(thd); + DBUG_RETURN(ret); +} + /** Close the log file. @@ -7993,11 +8151,11 @@ void MYSQL_BIN_LOG::close(uint exiting) { Stop_log_event s; // the checksumming rule for relay-log case is similar to Rotate - s.checksum_alg= is_relay_log ? - (uint8) relay_log_checksum_alg : (uint8) binlog_checksum_options; + s.checksum_alg= is_relay_log ? relay_log_checksum_alg + : (enum_binlog_checksum_alg)binlog_checksum_options; DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); - s.write(&log_file); + write_event(&s); bytes_written+= s.data_written; signal_update(); @@ -8055,7 +8213,8 @@ void MYSQL_BIN_LOG::close(uint exiting) if (mysql_file_close(index_file.file, MYF(0)) < 0 && ! write_error) { write_error= 1; - sql_print_error(ER(ER_ERROR_ON_WRITE), index_file_name, errno); + sql_print_error(ER_THD_OR_DEFAULT(current_thd, ER_ERROR_ON_WRITE), + index_file_name, errno); } } log_state= (exiting & LOG_CLOSE_TO_BE_OPENED) ? LOG_TO_BE_OPENED : LOG_CLOSED; @@ -8282,7 +8441,8 @@ static void print_buffer_to_file(enum loglevel level, const char *buffer, Add tag for slaves so that the user can see from which connection the error originates. */ - tag_length= my_snprintf(tag, sizeof(tag), ER(ER_MASTER_LOG_PREFIX), + tag_length= my_snprintf(tag, sizeof(tag), + ER_THD(thd, ER_MASTER_LOG_PREFIX), (int) thd->connection_name.length, thd->connection_name.str); } @@ -8294,13 +8454,14 @@ static void print_buffer_to_file(enum loglevel level, const char *buffer, localtime_r(&skr, &tm_tmp); start=&tm_tmp; - fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d [%s] %.*s%.*s\n", - start->tm_year % 100, + fprintf(stderr, "%d-%02d-%02d %2d:%02d:%02d %lu [%s] %.*s%.*s\n", + start->tm_year + 1900, start->tm_mon+1, start->tm_mday, start->tm_hour, start->tm_min, start->tm_sec, + (unsigned long) pthread_self(), (level == ERROR_LEVEL ? "ERROR" : level == WARNING_LEVEL ? "Warning" : "Note"), tag_length, tag, @@ -8377,6 +8538,9 @@ void sql_print_information(const char *format, ...) va_list args; DBUG_ENTER("sql_print_information"); + if (disable_log_notes) + DBUG_VOID_RETURN; // Skip notes during start/shutdown + va_start(args, format); error_log_print(INFORMATION_LEVEL, format, args); va_end(args); @@ -8426,8 +8590,7 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, { int cookie; struct commit_entry entry; - bool is_group_commit_leader; - LINT_INIT(is_group_commit_leader); + bool UNINIT_VAR(is_group_commit_leader); if (need_prepare_ordered) { @@ -9075,12 +9238,10 @@ int TC_LOG_MMAP::recover() the first byte after magic signature is set to current number of storage engines on startup */ - if (data[sizeof(tc_log_magic)] != total_ha_2pc) + if (data[sizeof(tc_log_magic)] > total_ha_2pc) { sql_print_error("Recovery failed! You must enable " - "exactly %d storage engines that support " - "two-phase commit protocol", - data[sizeof(tc_log_magic)]); + "all engines that were enabled at the moment of the crash"); goto err1; } @@ -9163,7 +9324,7 @@ int TC_LOG_BINLOG::open(const char *opt_name) { mysql_mutex_lock(&LOCK_log); /* generate a new binlog to mask a corrupted one */ - open(opt_name, LOG_BIN, 0, WRITE_CACHE, max_binlog_size, 0, TRUE); + open(opt_name, LOG_BIN, 0, 0, WRITE_CACHE, max_binlog_size, 0, TRUE); mysql_mutex_unlock(&LOCK_log); cleanup(); return 1; @@ -9193,14 +9354,10 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all, binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data(); if (!cache_mngr) -#ifdef WITH_WSREP { WSREP_DEBUG("Skipping empty log_xid: %s", thd->query()); DBUG_RETURN(0); } -#else - DBUG_RETURN(0); -#endif /* WITH_WSREP */ cache_mngr->using_xa= TRUE; cache_mngr->xa_xid= xid; @@ -9355,6 +9512,8 @@ TC_LOG_BINLOG::mark_xid_done(ulong binlog_id, bool write_checkpoint) DBUG_ASSERT(b); if (b->binlog_id == current || b->xid_count > 0) break; + WSREP_XID_LIST_ENTRY("TC_LOG_BINLOG::mark_xid_done(): Removing " + "xid_list_entry for %s (%lu)", b); my_free(binlog_xid_count_list.get()); } @@ -9506,9 +9665,7 @@ binlog_background_thread(void *arg __attribute__((unused))) THD_STAGE_INFO(thd, stage_binlog_stopping_background_thread); - mysql_mutex_lock(&LOCK_thread_count); delete thd; - mysql_mutex_unlock(&LOCK_thread_count); my_thread_end(); @@ -9668,6 +9825,13 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, break; #endif + case START_ENCRYPTION_EVENT: + { + if (fdle->start_decryption((Start_encryption_log_event*) ev)) + goto err2; + } + break; + default: /* Nothing. */ break; @@ -9744,6 +9908,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, sql_print_error("Error reading binlog files during recovery. Aborting."); goto err2; } + fdle->reset_crypto(); } if (do_xa) @@ -9935,15 +10100,14 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var, { ulong value= *((ulong *)save); bool check_purge= false; - ulong prev_binlog_id; - LINT_INIT(prev_binlog_id); + ulong UNINIT_VAR(prev_binlog_id); mysql_mutex_lock(mysql_bin_log.get_log_lock()); if(mysql_bin_log.is_open()) { prev_binlog_id= mysql_bin_log.current_binlog_id; if (binlog_checksum_options != value) - mysql_bin_log.checksum_alg_reset= (uint8) value; + mysql_bin_log.checksum_alg_reset= (enum_binlog_checksum_alg)value; if (mysql_bin_log.rotate(true, &check_purge)) check_purge= false; } @@ -9990,8 +10154,7 @@ static MYSQL_SYSVAR_ENUM( binlog_checksum_options, PLUGIN_VAR_RQCMDARG, "Type of BINLOG_CHECKSUM_ALG. Include checksum for " - "log events in the binary log. Possible values are NONE and CRC32; " - "default is NONE.", + "log events in the binary log", NULL, binlog_checksum_update, BINLOG_CHECKSUM_ALG_OFF, @@ -10076,3 +10239,58 @@ maria_declare_plugin(binlog) MariaDB_PLUGIN_MATURITY_STABLE /* maturity */ } maria_declare_plugin_end; + +#ifdef WITH_WSREP +IO_CACHE * get_trans_log(THD * thd) +{ + DBUG_ASSERT(binlog_hton->slot != HA_SLOT_UNDEF); + binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*) + thd_get_ha_data(thd, binlog_hton); + if (cache_mngr) + return cache_mngr->get_binlog_cache_log(true); + + WSREP_DEBUG("binlog cache not initialized, conn :%ld", thd->thread_id); + return NULL; +} + + +bool wsrep_trans_cache_is_empty(THD *thd) +{ + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + return (!cache_mngr || cache_mngr->trx_cache.empty()); +} + + +void thd_binlog_trx_reset(THD * thd) +{ + /* + todo: fix autocommit select to not call the caller + */ + if (thd_get_ha_data(thd, binlog_hton) != NULL) + { + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + if (cache_mngr) + { + cache_mngr->reset(false, true); + if (!cache_mngr->stmt_cache.empty()) + { + WSREP_DEBUG("pending events in stmt cache, sql: %s", thd->query()); + cache_mngr->stmt_cache.reset(); + } + } + } + thd->clear_binlog_table_maps(); +} + + +void thd_binlog_rollback_stmt(THD * thd) +{ + WSREP_DEBUG("thd_binlog_rollback_stmt :%ld", thd->thread_id); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + if (cache_mngr) + cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); +} +#endif /* WITH_WSREP */ |