diff options
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 2876 |
1 files changed, 2475 insertions, 401 deletions
diff --git a/sql/log.cc b/sql/log.cc index 54b2975349c..e1886b2572f 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -25,7 +25,7 @@ Abort logging when we get an error in reading or writing log files */ -#include "my_global.h" /* NO_EMBEDDED_ACCESS_CHECKS */ +#include <my_global.h> /* NO_EMBEDDED_ACCESS_CHECKS */ #include "sql_priv.h" #include "log.h" #include "sql_base.h" // open_log_table @@ -40,6 +40,7 @@ #include "rpl_rli.h" #include "sql_audit.h" #include "log_slow.h" +#include "mysqld.h" #include <my_dir.h> #include <stdarg.h> @@ -53,6 +54,7 @@ #include "rpl_handler.h" #include "debug_sync.h" #include "sql_show.h" +#include "my_pthread.h" /* max size of the log message */ #define MAX_LOG_BUFFER_SIZE 1024 @@ -71,6 +73,8 @@ static int binlog_init(void *p); static int binlog_close_connection(handlerton *hton, THD *thd); static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv); static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv); +static bool binlog_savepoint_rollback_can_release_mdl(handlerton *hton, + THD *thd); static int binlog_commit(handlerton *hton, THD *thd, bool all); static int binlog_rollback(handlerton *hton, THD *thd, bool all); static int binlog_prepare(handlerton *hton, THD *thd, bool all); @@ -86,10 +90,14 @@ ulong opt_binlog_dbug_fsync_sleep= 0; #endif mysql_mutex_t LOCK_prepare_ordered; +mysql_cond_t COND_prepare_ordered; mysql_mutex_t LOCK_commit_ordered; static ulonglong binlog_status_var_num_commits; static ulonglong binlog_status_var_num_group_commits; +static ulonglong binlog_status_group_commit_trigger_count; +static ulonglong binlog_status_group_commit_trigger_lock_wait; +static ulonglong binlog_status_group_commit_trigger_timeout; static char binlog_snapshot_file[FN_REFLEN]; static ulonglong binlog_snapshot_position; @@ -99,6 +107,12 @@ static SHOW_VAR binlog_status_vars_detail[]= (char *)&binlog_status_var_num_commits, SHOW_LONGLONG}, {"group_commits", (char *)&binlog_status_var_num_group_commits, SHOW_LONGLONG}, + {"group_commit_trigger_count", + (char *)&binlog_status_group_commit_trigger_count, SHOW_LONGLONG}, + {"group_commit_trigger_lock_wait", + (char *)&binlog_status_group_commit_trigger_lock_wait, SHOW_LONGLONG}, + {"group_commit_trigger_timeout", + (char *)&binlog_status_group_commit_trigger_timeout, SHOW_LONGLONG}, {"snapshot_file", (char *)&binlog_snapshot_file, SHOW_CHAR}, {"snapshot_position", @@ -106,6 +120,18 @@ static SHOW_VAR binlog_status_vars_detail[]= {NullS, NullS, SHOW_LONG} }; +/* + Variables for the binlog background thread. + Protected by the MYSQL_BIN_LOG::LOCK_binlog_background_thread mutex. + */ +static bool binlog_background_thread_started= false; +static bool binlog_background_thread_stop= false; +static MYSQL_BIN_LOG::xid_count_per_binlog * + binlog_background_thread_queue= NULL; + +static bool start_binlog_background_thread(); + +static rpl_binlog_state rpl_global_gtid_binlog_state; /** purge logs, master and slave sides both, related error code @@ -157,9 +183,9 @@ public: virtual bool handle_condition(THD *thd, uint sql_errno, const char* sql_state, - MYSQL_ERROR::enum_warning_level level, + Sql_condition::enum_warning_level level, const char* msg, - MYSQL_ERROR ** cond_hdl); + Sql_condition ** cond_hdl); const char *message() const { return m_message; } }; @@ -167,9 +193,9 @@ bool Silence_log_table_errors::handle_condition(THD *, uint, const char*, - MYSQL_ERROR::enum_warning_level, + Sql_condition::enum_warning_level, const char* msg, - MYSQL_ERROR ** cond_hdl) + Sql_condition ** cond_hdl) { *cond_hdl= NULL; strmake_buf(m_message, msg); @@ -482,6 +508,14 @@ public: */ bool using_xa; my_xid xa_xid; + bool need_unlog; + /* + Id of binlog that transaction was written to; only needed if need_unlog is + true. + */ + ulong binlog_id; + /* Set if we get an error during commit that must be returned from unlog(). */ + bool delayed_error; private: @@ -505,35 +539,51 @@ bool LOGGER::is_log_table_enabled(uint log_table_type) } -/* Check if a given table is opened log table */ -int check_if_log_table(size_t db_len, const char *db, size_t table_name_len, - const char *table_name, bool check_if_opened) +/** + Check if a given table is opened log table + + @param table Table to check + @param check_if_opened Only fail if it's a log table in use + @param error_msg String to put in error message if not ok. + No error message if 0 + @return 0 ok + @return # Type of log file + */ + +int check_if_log_table(const TABLE_LIST *table, + bool check_if_opened, + const char *error_msg) { - if (db_len == 5 && - !(lower_case_table_names ? - my_strcasecmp(system_charset_info, db, "mysql") : - strcmp(db, "mysql"))) + int result= 0; + if (table->db_length == 5 && + !my_strcasecmp(table_alias_charset, table->db, "mysql")) { - if (table_name_len == 11 && !(lower_case_table_names ? - my_strcasecmp(system_charset_info, - table_name, "general_log") : - strcmp(table_name, "general_log"))) + const char *table_name= table->table_name; + + if (table->table_name_length == 11 && + !my_strcasecmp(table_alias_charset, table_name, "general_log")) { - if (!check_if_opened || logger.is_log_table_enabled(QUERY_LOG_GENERAL)) - return QUERY_LOG_GENERAL; - return 0; + result= QUERY_LOG_GENERAL; + goto end; } - if (table_name_len == 8 && !(lower_case_table_names ? - my_strcasecmp(system_charset_info, table_name, "slow_log") : - strcmp(table_name, "slow_log"))) + if (table->table_name_length == 8 && + !my_strcasecmp(table_alias_charset, table_name, "slow_log")) { - if (!check_if_opened || logger.is_log_table_enabled(QUERY_LOG_SLOW)) - return QUERY_LOG_SLOW; - return 0; + result= QUERY_LOG_SLOW; + goto end; } } return 0; + +end: + if (!check_if_opened || logger.is_log_table_enabled(result)) + { + if (error_msg) + my_error(ER_BAD_LOG_STATEMENT, MYF(0), error_msg); + return result; + } + return 0; } @@ -581,7 +631,7 @@ void Log_to_csv_event_handler::cleanup() indicated in the return value. @retval FALSE OK - @retval TRUE error occured + @retval TRUE error occurred */ bool Log_to_csv_event_handler:: @@ -668,7 +718,8 @@ bool Log_to_csv_event_handler:: /* do a write */ if (table->field[1]->store(user_host, user_host_len, client_cs) || table->field[2]->store((longlong) thread_id, TRUE) || - table->field[3]->store((longlong) server_id, TRUE) || + table->field[3]->store((longlong) global_system_variables.server_id, + TRUE) || table->field[4]->store(command_type, command_type_len, client_cs)) goto err; @@ -745,7 +796,7 @@ err: RETURN FALSE - OK - TRUE - error occured + TRUE - error occurred */ bool Log_to_csv_event_handler:: @@ -763,8 +814,8 @@ bool Log_to_csv_event_handler:: Open_tables_backup open_tables_backup; CHARSET_INFO *client_cs= thd->variables.character_set_client; bool save_time_zone_used; - long query_time= (long) min(query_utime/1000000, TIME_MAX_VALUE_SECONDS); - long lock_time= (long) min(lock_utime/1000000, TIME_MAX_VALUE_SECONDS); + long query_time= (long) MY_MIN(query_utime/1000000, TIME_MAX_VALUE_SECONDS); + long lock_time= (long) MY_MIN(lock_utime/1000000, TIME_MAX_VALUE_SECONDS); long query_time_micro= (long) (query_utime % 1000000); long lock_time_micro= (long) (lock_utime % 1000000); @@ -826,10 +877,10 @@ bool Log_to_csv_event_handler:: if (table->field[3]->store_time(&t)) goto err; /* rows_sent */ - if (table->field[4]->store((longlong) thd->sent_row_count, TRUE)) + if (table->field[4]->store((longlong) thd->get_sent_row_count(), TRUE)) goto err; /* rows_examined */ - if (table->field[5]->store((longlong) thd->examined_row_count, TRUE)) + if (table->field[5]->store((longlong) thd->get_examined_row_count(), TRUE)) goto err; /* fill database field */ @@ -865,7 +916,7 @@ bool Log_to_csv_event_handler:: table->field[8]->set_notnull(); } - if (table->field[9]->store((longlong) server_id, TRUE)) + if (table->field[9]->store((longlong)global_system_variables.server_id, TRUE)) goto err; table->field[9]->set_notnull(); @@ -877,6 +928,9 @@ bool Log_to_csv_event_handler:: if (table->field[10]->store(sql_text, sql_text_len, client_cs) < 0) goto err; + if (table->field[11]->store((longlong) thd->thread_id, TRUE)) + goto err; + /* log table entries are not replicated */ if (table->file->ha_write_row(table->record[0])) goto err; @@ -1047,7 +1101,7 @@ void Log_to_file_event_handler::flush() RETURN FALSE - OK - TRUE - error occured + TRUE - error occurred */ bool LOGGER::error_log_print(enum loglevel level, const char *format, @@ -1205,7 +1259,7 @@ bool LOGGER::flush_general_log() RETURN FALSE OK - TRUE error occured + TRUE error occurred */ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length, @@ -1240,7 +1294,7 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length, /* fill in user_host value: the format is "%s[%s] @ %s [%s]" */ user_host_len= (strxnmov(user_host_buff, MAX_USER_HOST_SIZE, - sctx->priv_user ? sctx->priv_user : "", "[", + sctx->priv_user, "[", sctx->user ? sctx->user : (thd->slave_thread ? "SQL_SLAVE" : ""), "] @ ", sctx->host ? sctx->host : "", " [", sctx->ip ? sctx->ip : "", "]", NullS) - @@ -1256,8 +1310,8 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, uint query_length, if (!query) { is_command= TRUE; - query= command_name[thd->command].str; - query_length= command_name[thd->command].length; + query= command_name[thd->get_command()].str; + query_length= command_name[thd->get_command()].length; } for (current_handler= slow_log_handler_list; *current_handler ;) @@ -1589,6 +1643,8 @@ int binlog_init(void *p) binlog_hton->close_connection= binlog_close_connection; binlog_hton->savepoint_set= binlog_savepoint_set; binlog_hton->savepoint_rollback= binlog_savepoint_rollback; + binlog_hton->savepoint_rollback_can_release_mdl= + binlog_savepoint_rollback_can_release_mdl; binlog_hton->commit= binlog_commit; binlog_hton->rollback= binlog_rollback; binlog_hton->prepare= binlog_prepare; @@ -1633,6 +1689,7 @@ static int binlog_close_connection(handlerton *hton, THD *thd) contain updates to non-transactional tables. Or it can be a flush of a statement cache. */ + static int binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, Log_event *end_ev, bool all, bool using_stmt, @@ -1640,6 +1697,7 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, { int error= 0; DBUG_ENTER("binlog_flush_cache"); + DBUG_PRINT("enter", ("end_ev: %p", end_ev)); if ((using_stmt && !cache_mngr->stmt_cache.empty()) || (using_trx && !cache_mngr->trx_cache.empty())) @@ -1663,6 +1721,20 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, end_ev, all, using_stmt, using_trx); } + else + { + /* + This can happen in row-format binlog with something like + BEGIN; INSERT INTO nontrans_table; INSERT IGNORE INTO trans_table; + The nontrans_table is written directly into the binlog before commit, + and if the trans_table is ignored there will be no rows to write when + we get here. + + So there is no work to do. Therefore, we will not increment any XID + count, so we must not decrement any XID count in unlog(). + */ + cache_mngr->need_unlog= 0; + } cache_mngr->reset(using_stmt, using_trx); DBUG_ASSERT((!using_stmt || cache_mngr->stmt_cache.empty()) && @@ -1684,9 +1756,10 @@ static inline int binlog_commit_flush_stmt_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr) { + DBUG_ENTER("binlog_commit_flush_stmt_cache"); Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), FALSE, TRUE, TRUE, 0); - return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE)); + DBUG_RETURN(binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE)); } /** @@ -1701,9 +1774,10 @@ binlog_commit_flush_stmt_cache(THD *thd, bool all, static inline int binlog_commit_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr) { + DBUG_ENTER("binlog_commit_flush_trx_cache"); Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, TRUE, 0); - return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE)); + DBUG_RETURN(binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE)); } /** @@ -1821,6 +1895,32 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all) return 0; } +/* + We flush the cache wrapped in a beging/rollback if: + . aborting a single or multi-statement transaction and; + . the OPTION_KEEP_LOG is active or; + . the format is STMT and a non-trans table was updated or; + . the format is MIXED and a temporary non-trans table was + updated or; + . the format is MIXED, non-trans table was updated and + aborting a single statement transaction; +*/ +static bool trans_cannot_safely_rollback(THD *thd, bool all) +{ + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + + return ((thd->variables.option_bits & OPTION_KEEP_LOG) || + (trans_has_updated_non_trans_table(thd) && + thd->variables.binlog_format == BINLOG_FORMAT_STMT) || + (cache_mngr->trx_cache.changes_to_non_trans_temp_table() && + thd->variables.binlog_format == BINLOG_FORMAT_MIXED) || + (trans_has_updated_non_trans_table(thd) && + ending_single_stmt_trans(thd,all) && + thd->variables.binlog_format == BINLOG_FORMAT_MIXED)); +} + + /** This function is called once after each statement. @@ -1941,25 +2041,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) } else if (!error) { - /* - We flush the cache wrapped in a beging/rollback if: - . aborting a single or multi-statement transaction and; - . the OPTION_KEEP_LOG is active or; - . the format is STMT and a non-trans table was updated or; - . the format is MIXED and a temporary non-trans table was - updated or; - . the format is MIXED, non-trans table was updated and - aborting a single statement transaction; - */ - if (ending_trans(thd, all) && - ((thd->variables.option_bits & OPTION_KEEP_LOG) || - (trans_has_updated_non_trans_table(thd) && - thd->variables.binlog_format == BINLOG_FORMAT_STMT) || - (cache_mngr->trx_cache.changes_to_non_trans_temp_table() && - thd->variables.binlog_format == BINLOG_FORMAT_MIXED) || - (trans_has_updated_non_trans_table(thd) && - ending_single_stmt_trans(thd,all) && - thd->variables.binlog_format == BINLOG_FORMAT_MIXED))) + if (ending_trans(thd, all) && trans_cannot_safely_rollback(thd, all)) error= binlog_rollback_flush_trx_cache(thd, all, cache_mngr); /* Truncate the cache if: @@ -1988,6 +2070,21 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) DBUG_RETURN(error); } + +void binlog_reset_cache(THD *thd) +{ + binlog_cache_mngr *const cache_mngr= opt_bin_log ? + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton) : 0; + DBUG_ENTER("binlog_reset_cache"); + if (cache_mngr) + { + thd->binlog_remove_pending_rows_event(TRUE, TRUE); + cache_mngr->reset(true, true); + } + DBUG_VOID_RETURN; +} + + void MYSQL_BIN_LOG::set_write_error(THD *thd, bool is_transactional) { DBUG_ENTER("MYSQL_BIN_LOG::set_write_error"); @@ -2025,7 +2122,7 @@ bool MYSQL_BIN_LOG::check_write_error(THD *thd) if (!thd->is_error()) DBUG_RETURN(checked); - switch (thd->stmt_da->sql_errno()) + switch (thd->get_stmt_da()->sql_errno()) { case ER_TRANS_CACHE_FULL: case ER_STMT_CACHE_FULL: @@ -2066,9 +2163,7 @@ bool MYSQL_BIN_LOG::check_write_error(THD *thd) static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv) { DBUG_ENTER("binlog_savepoint_set"); - - binlog_trans_log_savepos(thd, (my_off_t*) sv); - /* Write it to the binary log */ + int error= 1; char buf[1024]; String log_query(buf, sizeof(buf), &my_charset_bin); @@ -2077,10 +2172,25 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv) thd->lex->ident.str, thd->lex->ident.length)) DBUG_RETURN(1); int errcode= query_error_code(thd, thd->killed == NOT_KILLED); - Query_log_event qinfo(thd, log_query.ptr(), log_query.length(), + Query_log_event qinfo(thd, log_query.c_ptr_safe(), log_query.length(), TRUE, FALSE, TRUE, errcode); - int ret= mysql_bin_log.write(&qinfo); - DBUG_RETURN(ret); + /* + We cannot record the position before writing the statement + because a rollback to a savepoint (.e.g. consider it "S") would + prevent the savepoint statement (i.e. "SAVEPOINT S") from being + written to the binary log despite the fact that the server could + still issue other rollback statements to the same savepoint (i.e. + "S"). + Given that the savepoint is valid until the server releases it, + ie, until the transaction commits or it is released explicitly, + we need to log it anyway so that we don't have "ROLLBACK TO S" + or "RELEASE S" without the preceding "SAVEPOINT S" in the binary + log. + */ + if (!(error= mysql_bin_log.write(&qinfo))) + binlog_trans_log_savepos(thd, (my_off_t*) sv); + + DBUG_RETURN(error); } static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) @@ -2111,6 +2221,30 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) } +/** + Check whether binlog state allows to safely release MDL locks after + rollback to savepoint. + + @param hton The binlog handlerton. + @param thd The client thread that executes the transaction. + + @return true - It is safe to release MDL locks. + false - If it is not. +*/ +static bool binlog_savepoint_rollback_can_release_mdl(handlerton *hton, + THD *thd) +{ + DBUG_ENTER("binlog_savepoint_rollback_can_release_mdl"); + /* + If we have not updated any non-transactional tables rollback + to savepoint will simply truncate binlog cache starting from + SAVEPOINT command. So it should be safe to release MDL acquired + after SAVEPOINT command in this case. + */ + DBUG_RETURN(!trans_cannot_safely_rollback(thd, true)); +} + + int check_binlog_magic(IO_CACHE* log, const char** errmsg) { uchar magic[4]; @@ -2245,7 +2379,7 @@ static int find_uniq_filename(char *name) DBUG_RETURN(1); } file_info= dir_info->dir_entry; - for (i= dir_info->number_off_files ; i-- ; file_info++) + for (i= dir_info->number_of_files ; i-- ; file_info++) { if (strncmp(file_info->name, start, length) == 0 && test_if_number(file_info->name+length, &number,0)) @@ -2538,7 +2672,8 @@ int MYSQL_LOG::generate_new_name(char *new_name, const char *log_name) { if (!fn_ext(log_name)[0]) { - if (find_uniq_filename(new_name)) + if (DBUG_EVALUATE_IF("binlog_inject_new_name_error", TRUE, FALSE) || + find_uniq_filename(new_name)) { if (current_thd) my_printf_error(ER_NO_UNIQUE_LOGFILE, ER(ER_NO_UNIQUE_LOGFILE), @@ -2567,16 +2702,16 @@ int MYSQL_LOG::generate_new_name(char *new_name, const char *log_name) void MYSQL_QUERY_LOG::reopen_file() { char *save_name; - DBUG_ENTER("MYSQL_LOG::reopen_file"); + + mysql_mutex_lock(&LOCK_log); if (!is_open()) { DBUG_PRINT("info",("log is closed")); + mysql_mutex_unlock(&LOCK_log); DBUG_VOID_RETURN; } - mysql_mutex_lock(&LOCK_log); - save_name= name; name= 0; // Don't free name close(LOG_CLOSE_TO_BE_OPENED); @@ -2620,7 +2755,7 @@ void MYSQL_QUERY_LOG::reopen_file() RETURN FASE - OK - TRUE - error occured + TRUE - error occurred */ bool MYSQL_QUERY_LOG::write(time_t event_time, const char *user_host, @@ -2722,7 +2857,7 @@ err: RETURN FALSE - OK - TRUE - error occured + TRUE - error occurred */ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time, @@ -2735,13 +2870,6 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time, DBUG_ENTER("MYSQL_QUERY_LOG::write"); mysql_mutex_lock(&LOCK_log); - - if (!is_open()) - { - mysql_mutex_unlock(&LOCK_log); - DBUG_RETURN(0); - } - if (is_open()) { // Safety agains reopen int tmp_errno= 0; @@ -2786,8 +2914,8 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time, (ulong) thd->thread_id, (thd->db ? thd->db : ""), ((thd->query_plan_flags & QPLAN_QC) ? "Yes" : "No"), query_time_buff, lock_time_buff, - (ulong) thd->sent_row_count, - (ulong) thd->examined_row_count) == (size_t) -1) + (ulong) thd->get_sent_row_count(), + (ulong) thd->get_examined_row_count()) == (size_t) -1) tmp_errno= errno; if ((thd->variables.log_slow_verbosity & LOG_SLOW_VERBOSITY_QUERY_PLAN) && (thd->query_plan_flags & @@ -2796,7 +2924,8 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time, my_b_printf(&log_file, "# Full_scan: %s Full_join: %s " "Tmp_table: %s Tmp_table_on_disk: %s\n" - "# Filesort: %s Filesort_on_disk: %s Merge_passes: %lu\n", + "# Filesort: %s Filesort_on_disk: %s Merge_passes: %lu " + "Priority_queue: %s\n", ((thd->query_plan_flags & QPLAN_FULL_SCAN) ? "Yes" : "No"), ((thd->query_plan_flags & QPLAN_FULL_JOIN) ? "Yes" : "No"), ((thd->query_plan_flags & QPLAN_TMP_TABLE) ? "Yes" : "No"), @@ -2804,8 +2933,20 @@ bool MYSQL_QUERY_LOG::write(THD *thd, time_t current_time, ((thd->query_plan_flags & QPLAN_FILESORT) ? "Yes" : "No"), ((thd->query_plan_flags & QPLAN_FILESORT_DISK) ? "Yes" : "No"), - thd->query_plan_fsort_passes) == (size_t) -1) + thd->query_plan_fsort_passes, + ((thd->query_plan_flags & QPLAN_FILESORT_PRIORITY_QUEUE) ? + "Yes" : "No") + ) == (size_t) -1) tmp_errno= errno; + if (thd->variables.log_slow_verbosity & LOG_SLOW_VERBOSITY_EXPLAIN && + thd->lex->explain) + { + StringBuffer<128> buf; + DBUG_ASSERT(!thd->free_list); + if (!print_explain_query(thd->lex, thd, &buf)) + my_b_printf(&log_file, "%s", buf.c_ptr_safe()); + thd->free_items(); + } if (thd->db && strcmp(thd->db, db)) { // Database changed if (my_b_printf(&log_file,"use %s;\n",thd->db) == (size_t) -1) @@ -2895,7 +3036,7 @@ const char *MYSQL_LOG::generate_name(const char *log_name, { char *p= fn_ext(log_name); uint length= (uint) (p - log_name); - strmake(buff, log_name, min(length, FN_REFLEN-1)); + strmake(buff, log_name, MY_MIN(length, FN_REFLEN-1)); return (const char*)buff; } return log_name; @@ -2904,15 +3045,19 @@ const char *MYSQL_LOG::generate_name(const char *log_name, MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period) - :bytes_written(0), prepared_xids(0), file_id(1), open_count(1), - need_start_event(TRUE), + :reset_master_pending(0), mark_xid_done_waiting(0), + bytes_written(0), file_id(1), open_count(1), group_commit_queue(0), group_commit_queue_busy(FALSE), num_commits(0), num_group_commits(0), + group_commit_trigger_count(0), group_commit_trigger_timeout(0), + group_commit_trigger_lock_wait(0), sync_period_ptr(sync_period), sync_counter(0), + state_file_deleted(false), binlog_state_recover_done(false), is_relay_log(0), signal_cnt(0), checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF), relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), - description_event_for_exec(0), description_event_for_queue(0) + description_event_for_exec(0), description_event_for_queue(0), + current_binlog_id(0) { /* We don't want to initialize locks here as such initialization depends on @@ -2932,23 +3077,65 @@ void MYSQL_BIN_LOG::cleanup() DBUG_ENTER("cleanup"); if (inited) { + xid_count_per_binlog *b; + + /* Wait for the binlog background thread to stop. */ + if (!is_relay_log && binlog_background_thread_started) + { + mysql_mutex_lock(&LOCK_binlog_background_thread); + binlog_background_thread_stop= true; + mysql_cond_signal(&COND_binlog_background_thread); + while (binlog_background_thread_stop) + mysql_cond_wait(&COND_binlog_background_thread_end, + &LOCK_binlog_background_thread); + mysql_mutex_unlock(&LOCK_binlog_background_thread); + binlog_background_thread_started= false; + } + inited= 0; + mysql_mutex_lock(&LOCK_log); close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT); + mysql_mutex_unlock(&LOCK_log); delete description_event_for_queue; delete description_event_for_exec; + + while ((b= binlog_xid_count_list.get())) + { + /* + There should be no pending XIDs at shutdown, and only one entry (for + the active binlog file) in the list. + */ + DBUG_ASSERT(b->xid_count == 0); + DBUG_ASSERT(!binlog_xid_count_list.head()); + my_free(b); + } + mysql_mutex_destroy(&LOCK_log); mysql_mutex_destroy(&LOCK_index); + mysql_mutex_destroy(&LOCK_xid_list); + mysql_mutex_destroy(&LOCK_binlog_background_thread); mysql_cond_destroy(&update_cond); + mysql_cond_destroy(&COND_queue_busy); + mysql_cond_destroy(&COND_xid_list); + mysql_cond_destroy(&COND_binlog_background_thread); + mysql_cond_destroy(&COND_binlog_background_thread_end); } + + /* + Free data for global binlog state. + We can't do that automaticly as we need to do this before + safemalloc is shut down + */ + if (!is_relay_log) + rpl_global_gtid_binlog_state.free(); DBUG_VOID_RETURN; } /* Init binlog-specific vars */ -void MYSQL_BIN_LOG::init(bool no_auto_events_arg, ulong max_size_arg) +void MYSQL_BIN_LOG::init(ulong max_size_arg) { DBUG_ENTER("MYSQL_BIN_LOG::init"); - no_auto_events= no_auto_events_arg; max_size= max_size_arg; DBUG_PRINT("info",("max_size: %lu", max_size)); DBUG_VOID_RETURN; @@ -2960,8 +3147,18 @@ void MYSQL_BIN_LOG::init_pthread_objects() MYSQL_LOG::init_pthread_objects(); mysql_mutex_init(m_key_LOCK_index, &LOCK_index, MY_MUTEX_INIT_SLOW); mysql_mutex_setflags(&LOCK_index, MYF_NO_DEADLOCK_DETECTION); + mysql_mutex_init(key_BINLOG_LOCK_xid_list, + &LOCK_xid_list, MY_MUTEX_INIT_FAST); mysql_cond_init(m_key_update_cond, &update_cond, 0); mysql_cond_init(m_key_COND_queue_busy, &COND_queue_busy, 0); + mysql_cond_init(key_BINLOG_COND_xid_list, &COND_xid_list, 0); + + mysql_mutex_init(key_BINLOG_LOCK_binlog_background_thread, + &LOCK_binlog_background_thread, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_BINLOG_COND_binlog_background_thread, + &COND_binlog_background_thread, 0); + mysql_cond_init(key_BINLOG_COND_binlog_background_thread_end, + &COND_binlog_background_thread_end, 0); } @@ -3049,16 +3246,31 @@ bool MYSQL_BIN_LOG::open(const char *log_name, enum_log_type log_type_arg, const char *new_name, enum cache_type io_cache_type_arg, - bool no_auto_events_arg, ulong max_size_arg, bool null_created_arg, bool need_mutex) { File file= -1; - + xid_count_per_binlog *new_xid_list_entry= NULL, *b; DBUG_ENTER("MYSQL_BIN_LOG::open"); DBUG_PRINT("enter",("log_type: %d",(int) log_type_arg)); + mysql_mutex_assert_owner(&LOCK_log); + + if (!is_relay_log) + { + if (!binlog_state_recover_done) + { + binlog_state_recover_done= true; + if (do_binlog_recovery(opt_bin_logname, false)) + DBUG_RETURN(1); + } + + if (!binlog_background_thread_started && + start_binlog_background_thread()) + DBUG_RETURN(1); + } + if (init_and_set_log_file_name(log_name, new_name, log_type_arg, io_cache_type_arg)) { @@ -3078,7 +3290,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, it may be good to consider what actually happens when open_purge_index_file succeeds but register or sync fails. - Perhaps we might need the code below in MYSQL_LOG_BIN::cleanup + Perhaps we might need the code below in MYSQL_BIN_LOG::cleanup for "real life" purposes as well? */ DBUG_EXECUTE_IF("fault_injection_registering_index", { @@ -3110,7 +3322,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, DBUG_RETURN(1); /* all warnings issued */ } - init(no_auto_events_arg, max_size_arg); + init(max_size_arg); open_count++; @@ -3134,11 +3346,10 @@ bool MYSQL_BIN_LOG::open(const char *log_name, write_file_name_to_index_file= 1; } - if (need_start_event && !no_auto_events) { /* - In 4.x we set need_start_event=0 here, but in 5.0 we want a Start event - even if this is not the very first binlog. + In 4.x we put Start event only in the first binlog. But from 5.0 we + want a Start event even if this is not the very first binlog. */ Format_description_log_event s(BINLOG_VERSION); /* @@ -3165,6 +3376,101 @@ bool MYSQL_BIN_LOG::open(const char *log_name, if (s.write(&log_file)) goto err; bytes_written+= s.data_written; + + if (!is_relay_log) + { + char buf[FN_REFLEN]; + + /* + Output a Gtid_list_log_event at the start of the binlog file. + + This is used to quickly determine which GTIDs are found in binlog + files earlier than this one, and which are found in this (or later) + binlogs. + + The list gives a mapping from (domain_id, server_id) -> seq_no (so + this means that there is at most one entry for every unique pair + (domain_id, server_id) in the list). It indicates that this seq_no is + the last one found in an earlier binlog file for this (domain_id, + server_id) combination - so any higher seq_no should be search for + from this binlog file, or a later one. + + This allows to locate the binlog file containing a given GTID by + scanning backwards, reading just the Gtid_list_log_event at the + start of each file, and scanning only the relevant binlog file when + found, not all binlog files. + + The existence of a given entry (domain_id, server_id, seq_no) + guarantees only that this seq_no will not be found in this or any + later binlog file. It does not guarantee that it can be found it an + earlier binlog file, for example the file may have been purged. + + If there is no entry for a given (domain_id, server_id) pair, then + it means that no such GTID exists in any earlier binlog. It is + permissible to remove such pair from future Gtid_list_log_events + if all previous binlog files containing such GTIDs have been purged + (though such optimization is not performed at the time of this + writing). So if there is no entry for given GTID it means that such + GTID should be search for in this or later binlog file, same as if + there had been an entry (domain_id, server_id, 0). + */ + + Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state, 0); + if (gl_ev.write(&log_file)) + goto err; + + /* Output a binlog checkpoint event at the start of the binlog file. */ + + /* + Construct an entry in the binlog_xid_count_list for the new binlog + file (we will not link it into the list until we know the new file + is successfully created; otherwise we would have to remove it again + if creation failed, which gets tricky since other threads may have + seen the entry in the meantime - and we do not want to hold + LOCK_xid_list for long periods of time). + + Write the current binlog checkpoint into the log, so XA recovery will + know from where to start recovery. + */ + uint off= dirname_length(log_file_name); + uint len= strlen(log_file_name) - off; + char *entry_mem, *name_mem; + if (!(new_xid_list_entry = (xid_count_per_binlog *) + my_multi_malloc(MYF(MY_WME), + &entry_mem, sizeof(xid_count_per_binlog), + &name_mem, len, + NULL))) + goto err; + memcpy(name_mem, log_file_name+off, len); + new_xid_list_entry->binlog_name= name_mem; + new_xid_list_entry->binlog_name_len= len; + new_xid_list_entry->xid_count= 0; + + /* + Find the name for the Initial binlog checkpoint. + + Normally this will just be the first entry, as we delete entries + when their count drops to zero. But we scan the list to handle any + corner case, eg. for the first binlog file opened after startup, the + list will be empty. + */ + mysql_mutex_lock(&LOCK_xid_list); + I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list); + while ((b= it++) && b->xid_count == 0) + ; + mysql_mutex_unlock(&LOCK_xid_list); + if (!b) + b= new_xid_list_entry; + strmake(buf, b->binlog_name, b->binlog_name_len); + Binlog_checkpoint_log_event ev(buf, len); + DBUG_EXECUTE_IF("crash_before_write_checkpoint_event", + flush_io_cache(&log_file); + mysql_file_sync(log_file.file, MYF(MY_WME)); + DBUG_SUICIDE();); + if (ev.write(&log_file)) + goto err; + bytes_written+= ev.data_written; + } } if (description_event_for_queue && description_event_for_queue->binlog_version>=4) @@ -3235,6 +3541,42 @@ bool MYSQL_BIN_LOG::open(const char *log_name, #endif } } + + if (!is_relay_log) + { + /* + Now the file was created successfully, so we can link in the entry for + the new binlog file in binlog_xid_count_list. + */ + mysql_mutex_lock(&LOCK_xid_list); + ++current_binlog_id; + new_xid_list_entry->binlog_id= current_binlog_id; + /* Remove any initial entries with no pending XIDs. */ + while ((b= binlog_xid_count_list.head()) && b->xid_count == 0) + my_free(binlog_xid_count_list.get()); + binlog_xid_count_list.push_back(new_xid_list_entry); + mysql_mutex_unlock(&LOCK_xid_list); + + /* + Now that we have synced a new binlog file with an initial Gtid_list + event, it is safe to delete the binlog state file. We will write out + a new, updated file at shutdown, and if we crash before we can recover + the state from the newly written binlog file. + + Since the state file will contain out-of-date data as soon as the first + new GTID is binlogged, it is better to remove it, to avoid any risk of + accidentally reading incorrect data later. + */ + if (!state_file_deleted) + { + char buf[FN_REFLEN]; + fn_format(buf, opt_bin_logname, mysql_data_home, ".state", + MY_UNPACK_FILENAME); + my_delete(buf, MY_SYNC_DIR); + state_file_deleted= true; + } + } + log_state= LOG_OPENED; #ifdef HAVE_REPLICATION @@ -3253,6 +3595,8 @@ err: Turning logging off for the whole duration of the MySQL server process. \ To turn it on again: fix the cause, \ shutdown the MySQL server and restart it.", name, errno); + if (new_xid_list_entry) + my_free(new_xid_list_entry); if (file >= 0) mysql_file_close(file, MYF(0)); close(LOG_CLOSE_INDEX); @@ -3312,7 +3656,8 @@ static bool copy_up_file_and_fill(IO_CACHE *index_file, my_off_t offset) if (!bytes_read) break; // end of file mysql_file_seek(file, offset-init_offset, MY_SEEK_SET, MYF(0)); - if (mysql_file_write(file, io_buf, bytes_read, MYF(MY_WME | MY_NABP))) + if (mysql_file_write(file, io_buf, bytes_read, + MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL))) goto err; } /* The following will either truncate the file or fill the end with \n' */ @@ -3498,11 +3843,11 @@ err: /** Delete all logs refered to in the index file. - Start writing to a new log file. The new index file will only contain this file. - @param thd Thread + @param thd Thread + @param create_new_log 1 if we should start writing to a new log file @note If not called from slave thread, write start event to new log @@ -3513,7 +3858,8 @@ err: 1 error */ -bool MYSQL_BIN_LOG::reset_logs(THD* thd) +bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log, + rpl_gtid *init_state, uint32 init_state_len) { LOG_INFO linfo; bool error=0; @@ -3521,7 +3867,34 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) const char* save_name; DBUG_ENTER("reset_logs"); - ha_reset_logs(thd); + if (!is_relay_log) + { + if (init_state && !is_empty_state()) + { + my_error(ER_BINLOG_MUST_BE_EMPTY, MYF(0)); + DBUG_RETURN(1); + } + + /* + Mark that a RESET MASTER is in progress. + This ensures that a binlog checkpoint will not try to write binlog + checkpoint events, which would be useless (as we are deleting the binlog + anyway) and could deadlock, as we are holding LOCK_log. + + Wait for any mark_xid_done() calls that might be already running to + complete (mark_xid_done_waiting counter to drop to zero); we need to + do this before we take the LOCK_log to not deadlock. + */ + mysql_mutex_lock(&LOCK_xid_list); + reset_master_pending++; + while (mark_xid_done_waiting > 0) + mysql_cond_wait(&COND_xid_list, &LOCK_xid_list); + mysql_mutex_unlock(&LOCK_xid_list); + } + + DEBUG_SYNC(thd, "reset_logs_after_set_reset_master_pending"); + if (thd) + ha_reset_logs(thd); /* We need to get both locks to be sure that no one is trying to write to the index log file. @@ -3529,6 +3902,50 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) mysql_mutex_lock(&LOCK_log); mysql_mutex_lock(&LOCK_index); + if (!is_relay_log) + { + /* + We are going to nuke all binary log files. + Without binlog, we cannot XA recover prepared-but-not-committed + transactions in engines. So force a commit checkpoint first. + + Note that we take and immediately release LOCK_commit_ordered. This has + the effect to ensure that any on-going group commit (in + trx_group_commit_leader()) has completed before we request the checkpoint, + due to the chaining of LOCK_log and LOCK_commit_ordered in that function. + (We are holding LOCK_log, so no new group commit can start). + + Without this, it is possible (though perhaps unlikely) that the RESET + MASTER could run in-between the write to the binlog and the + commit_ordered() in the engine of some transaction, and then a crash + later would leave such transaction not recoverable. + */ + mysql_mutex_lock(&LOCK_commit_ordered); + mysql_mutex_unlock(&LOCK_commit_ordered); + + mark_xids_active(current_binlog_id, 1); + do_checkpoint_request(current_binlog_id); + + /* Now wait for all checkpoint requests and pending unlog() to complete. */ + mysql_mutex_lock(&LOCK_xid_list); + for (;;) + { + if (is_xidlist_idle_nolock()) + break; + /* + Wait until signalled that one more binlog dropped to zero, then check + again. + */ + mysql_cond_wait(&COND_xid_list, &LOCK_xid_list); + } + + /* + Now all XIDs are fully flushed to disk, and we are holding LOCK_log so + no new ones will be written. So we can proceed to delete the logs. + */ + mysql_mutex_unlock(&LOCK_xid_list); + } + /* The following mutex is needed to ensure that no threads call 'delete thd' as we would then risk missing a 'rollback' from this @@ -3567,7 +3984,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) { if (my_errno == ENOENT) { - push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN, + push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN, ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE), linfo.log_file_name); sql_print_information("Failed to delete file '%s'", @@ -3577,7 +3994,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) } else { - push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN, + push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN, ER_BINLOG_PURGE_FATAL_ERR, "a problem with deleting %s; " "consider examining correspondence " @@ -3592,13 +4009,21 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) break; } + if (!is_relay_log) + { + if (init_state) + rpl_global_gtid_binlog_state.load(init_state, init_state_len); + else + rpl_global_gtid_binlog_state.reset(); + } + /* Start logging with a new file */ close(LOG_CLOSE_INDEX | LOG_CLOSE_TO_BE_OPENED); if ((error= my_delete(index_file_name, MYF(0)))) // Reset (open will update) { if (my_errno == ENOENT) { - push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN, + push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN, ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE), index_file_name); sql_print_information("Failed to delete file '%s'", @@ -3608,7 +4033,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) } else { - push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN, + push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN, ER_BINLOG_PURGE_FATAL_ERR, "a problem with deleting %s; " "consider examining correspondence " @@ -3619,10 +4044,8 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) goto err; } } - if (!thd->slave_thread) - need_start_event=1; - if (!open_index_file(index_file_name, 0, FALSE)) - if ((error= open(save_name, log_type, 0, io_cache_type, no_auto_events, max_size, 0, FALSE))) + if (create_new_log && !open_index_file(index_file_name, 0, FALSE)) + if ((error= open(save_name, log_type, 0, io_cache_type, max_size, 0, FALSE))) goto err; my_free((void *) save_name); @@ -3630,6 +4053,31 @@ err: if (error == 1) name= const_cast<char*>(save_name); mysql_mutex_unlock(&LOCK_thread_count); + + if (!is_relay_log) + { + xid_count_per_binlog *b; + /* + Remove all entries in the xid_count list except the last. + Normally we will just be deleting all the entries that we waited for to + drop to zero above. But if we fail during RESET MASTER for some reason + then we will not have created any new log file, and we may keep the last + of the old entries. + */ + mysql_mutex_lock(&LOCK_xid_list); + for (;;) + { + b= binlog_xid_count_list.head(); + DBUG_ASSERT(b /* List can never become empty. */); + if (b->binlog_id == current_binlog_id) + break; + DBUG_ASSERT(b->xid_count == 0); + my_free(binlog_xid_count_list.get()); + } + reset_master_pending--; + mysql_mutex_unlock(&LOCK_xid_list); + } + mysql_mutex_unlock(&LOCK_index); mysql_mutex_unlock(&LOCK_log); DBUG_RETURN(error); @@ -3677,17 +4125,41 @@ err: int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included) { - int error; + int error, errcode; char *to_purge_if_included= NULL; + inuse_relaylog *ir; ulonglong log_space_reclaimed= 0; DBUG_ENTER("purge_first_log"); DBUG_ASSERT(is_open()); - DBUG_ASSERT(rli->slave_running == 1); + DBUG_ASSERT(rli->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT); DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name)); mysql_mutex_lock(&LOCK_index); - to_purge_if_included= my_strdup(rli->group_relay_log_name, MYF(0)); + + ir= rli->inuse_relaylog_list; + while (ir) + { + inuse_relaylog *next= ir->next; + if (!ir->completed || ir->dequeued_count < ir->queued_count) + { + included= false; + break; + } + if (!included && !strcmp(ir->name, rli->group_relay_log_name)) + break; + if (!next) + { + rli->last_inuse_relaylog= NULL; + included= 1; + to_purge_if_included= my_strdup(ir->name, MYF(0)); + } + rli->free_inuse_relaylog(ir); + ir= next; + } + rli->inuse_relaylog_list= ir; + if (ir) + to_purge_if_included= my_strdup(ir->name, MYF(0)); /* Read the next log file name from the index file and pass it back to @@ -3724,7 +4196,8 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included) } /* Store where we are in the new file for the execution thread */ - flush_relay_log_info(rli); + if (flush_relay_log_info(rli)) + error= LOG_INFO_IO; DBUG_EXECUTE_IF("crash_before_purge_logs", DBUG_SUICIDE();); @@ -3740,11 +4213,13 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included) * Need to update the log pos because purge logs has been called * after fetching initially the log pos at the begining of the method. */ - if((error=find_log_pos(&rli->linfo, rli->event_relay_log_name, 0))) + if ((errcode= find_log_pos(&rli->linfo, rli->event_relay_log_name, 0))) { char buff[22]; + if (!error) + error= errcode; sql_print_error("next log error: %d offset: %s log: %s included: %d", - error, + errcode, llstr(rli->linfo.index_file_offset,buff), rli->group_relay_log_name, included); @@ -3834,8 +4309,7 @@ int MYSQL_BIN_LOG::purge_logs(const char *to_log, if ((error=find_log_pos(&log_info, NullS, 0 /*no mutex*/))) goto err; while ((strcmp(to_log,log_info.log_file_name) || (exit_loop=included)) && - !is_active(log_info.log_file_name) && - !log_in_use(log_info.log_file_name)) + can_purge_log(log_info.log_file_name)) { if ((error= register_purge_index_entry(log_info.log_file_name))) { @@ -4023,7 +4497,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space, */ if (thd) { - push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE), log_info.log_file_name); } @@ -4038,7 +4512,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space, */ if (thd) { - push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_BINLOG_PURGE_FATAL_ERR, "a problem with getting info on being purged %s; " "consider examining correspondence " @@ -4066,7 +4540,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space, { if (thd) { - push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_BINLOG_PURGE_FATAL_ERR, "a problem with deleting %s and " "reading the binlog index file", @@ -4102,7 +4576,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space, { if (thd) { - push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_LOG_PURGE_NO_FILE, ER(ER_LOG_PURGE_NO_FILE), log_info.log_file_name); } @@ -4114,7 +4588,7 @@ int MYSQL_BIN_LOG::purge_index_entry(THD *thd, ulonglong *reclaimed_space, { if (thd) { - push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_BINLOG_PURGE_FATAL_ERR, "a problem with deleting %s; " "consider examining correspondence " @@ -4185,8 +4659,7 @@ int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time) goto err; while (strcmp(log_file_name, log_info.log_file_name) && - !is_active(log_info.log_file_name) && - !log_in_use(log_info.log_file_name)) + can_purge_log(log_info.log_file_name)) { if (!mysql_file_stat(m_key_file_log, log_info.log_file_name, &stat_area, MYF(0))) @@ -4205,7 +4678,7 @@ int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time) */ if (thd) { - push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_BINLOG_PURGE_FATAL_ERR, "a problem with getting info on being purged %s; " "consider examining correspondence " @@ -4239,9 +4712,57 @@ err: mysql_mutex_unlock(&LOCK_index); DBUG_RETURN(error); } + + +bool +MYSQL_BIN_LOG::can_purge_log(const char *log_file_name) +{ + xid_count_per_binlog *b; + + if (is_active(log_file_name)) + return false; + mysql_mutex_lock(&LOCK_xid_list); + { + I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list); + while ((b= it++) && + 0 != strncmp(log_file_name+dirname_length(log_file_name), + b->binlog_name, b->binlog_name_len)) + ; + } + mysql_mutex_unlock(&LOCK_xid_list); + if (b) + return false; + return !log_in_use(log_file_name); +} #endif /* HAVE_REPLICATION */ +bool +MYSQL_BIN_LOG::is_xidlist_idle() +{ + bool res; + mysql_mutex_lock(&LOCK_xid_list); + res= is_xidlist_idle_nolock(); + mysql_mutex_unlock(&LOCK_xid_list); + return res; +} + + +bool +MYSQL_BIN_LOG::is_xidlist_idle_nolock() +{ + xid_count_per_binlog *b; + + I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list); + while ((b= it++)) + { + if (b->xid_count > 0) + return false; + } + return true; +} + + /** Create a new log file name. @@ -4317,41 +4838,21 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) bool delay_close= false; File old_file; LINT_INIT(old_file); - DBUG_ENTER("MYSQL_BIN_LOG::new_file_impl"); - if (!is_open()) - { - DBUG_PRINT("info",("log is closed")); - DBUG_RETURN(error); - } if (need_lock) mysql_mutex_lock(&LOCK_log); - mysql_mutex_lock(&LOCK_index); - mysql_mutex_assert_owner(&LOCK_log); - mysql_mutex_assert_owner(&LOCK_index); - /* - if binlog is used as tc log, be sure all xids are "unlogged", - so that on recover we only need to scan one - latest - binlog file - for prepared xids. As this is expected to be a rare event, - simple wait strategy is enough. We're locking LOCK_log to be sure no - new Xid_log_event's are added to the log (and prepared_xids is not - increased), and waiting on COND_prep_xids for late threads to - catch up. - */ - if (prepared_xids) + if (!is_open()) { - tc_log_page_waits++; - mysql_mutex_lock(&LOCK_prep_xids); - while (prepared_xids) { - DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids)); - mysql_cond_wait(&COND_prep_xids, &LOCK_prep_xids); - } - mysql_mutex_unlock(&LOCK_prep_xids); + DBUG_PRINT("info",("log is closed")); + mysql_mutex_unlock(&LOCK_log); + DBUG_RETURN(error); } + mysql_mutex_lock(&LOCK_index); + /* Reuse old name if not binlog and not update log */ new_name_ptr= name; @@ -4366,7 +4867,6 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) if (log_type == LOG_BIN) { - if (!no_auto_events) { /* We log the whole file name for log file as the user may decide @@ -4441,7 +4941,7 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) /* reopen the binary log file. */ file_to_open= new_name_ptr; error= open(old_name, log_type, new_name_ptr, io_cache_type, - no_auto_events, max_size, 1, FALSE); + max_size, 1, FALSE); } /* handle reopening errors */ @@ -4485,20 +4985,31 @@ end: new_name_ptr, errno); } + mysql_mutex_unlock(&LOCK_index); if (need_lock) mysql_mutex_unlock(&LOCK_log); - mysql_mutex_unlock(&LOCK_index); DBUG_RETURN(error); } -bool MYSQL_BIN_LOG::append(Log_event* ev) +bool +MYSQL_BIN_LOG::append(Log_event *ev) { - bool error = 0; + bool res; mysql_mutex_lock(&LOCK_log); + res= append_no_lock(ev); + mysql_mutex_unlock(&LOCK_log); + return res; +} + + +bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev) +{ + bool error = 0; DBUG_ENTER("MYSQL_BIN_LOG::append"); + mysql_mutex_assert_owner(&LOCK_log); DBUG_ASSERT(log_file.type == SEQ_READ_APPEND); /* Log_event::write() is smart enough to use my_b_write() or @@ -4513,10 +5024,9 @@ bool MYSQL_BIN_LOG::append(Log_event* ev) DBUG_PRINT("info",("max_size: %lu",max_size)); if (flush_and_sync(0)) goto err; - if ((uint) my_b_append_tell(&log_file) > max_size) + if (my_b_append_tell(&log_file) > max_size) error= new_file_without_locking(); err: - mysql_mutex_unlock(&LOCK_log); signal_update(); // Safe as we don't call close DBUG_RETURN(error); } @@ -4544,7 +5054,7 @@ bool MYSQL_BIN_LOG::appendv(const char* buf, uint len,...) DBUG_PRINT("info",("max_size: %lu",max_size)); if (flush_and_sync(0)) goto err; - if ((uint) my_b_append_tell(&log_file) > max_size) + if (my_b_append_tell(&log_file) > max_size) error= new_file_without_locking(); err: if (!error) @@ -4872,6 +5382,10 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional, (long) table, table->s->table_name.str, table->s->table_map_id)); + /* Ensure that all events in a GTID group are in the same cache */ + if (variables.option_bits & OPTION_GTID_BEGIN) + is_transactional= 1; + /* Pre-conditions */ DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); DBUG_ASSERT(table->s->table_map_id != ULONG_MAX); @@ -4892,7 +5406,7 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional, if (with_annotate && *with_annotate) { - Annotate_rows_log_event anno(current_thd, is_transactional, false); + Annotate_rows_log_event anno(table->in_use, is_transactional, false); /* Annotate event should be written not more than once */ *with_annotate= 0; if ((error= anno.write(file))) @@ -5057,6 +5571,244 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, DBUG_RETURN(error); } + +/* Generate a new global transaction ID, and write it to the binlog */ + +bool +MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, + bool is_transactional, uint64 commit_id) +{ + rpl_gtid gtid; + uint32 domain_id= thd->variables.gtid_domain_id; + uint32 server_id= thd->variables.server_id; + uint64 seq_no= thd->variables.gtid_seq_no; + int err; + DBUG_ENTER("write_gtid_event"); + DBUG_PRINT("enter", ("standalone: %d", standalone)); + + if (thd->variables.option_bits & OPTION_GTID_BEGIN) + { + DBUG_PRINT("error", ("OPTION_GTID_BEGIN is set. " + "Master and slave will have different GTID values")); + /* Reset the flag, as we will write out a GTID anyway */ + thd->variables.option_bits&= ~OPTION_GTID_BEGIN; + } + + /* + Reset the session variable gtid_seq_no, to reduce the risk of accidentally + producing a duplicate GTID. + */ + thd->variables.gtid_seq_no= 0; + if (seq_no != 0) + { + /* Use the specified sequence number. */ + gtid.domain_id= domain_id; + gtid.server_id= server_id; + gtid.seq_no= seq_no; + err= rpl_global_gtid_binlog_state.update(>id, opt_gtid_strict_mode); + if (err && thd->get_stmt_da()->sql_errno()==ER_GTID_STRICT_OUT_OF_ORDER) + errno= ER_GTID_STRICT_OUT_OF_ORDER; + } + else + { + /* Allocate the next sequence number for the GTID. */ + err= rpl_global_gtid_binlog_state.update_with_next_gtid(domain_id, + server_id, >id); + seq_no= gtid.seq_no; + } + if (err) + DBUG_RETURN(true); + thd->last_commit_gtid= gtid; + + Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone, + LOG_EVENT_SUPPRESS_USE_F, is_transactional, + commit_id); + + /* Write the event to the binary log. */ + if (gtid_event.write(&mysql_bin_log.log_file)) + DBUG_RETURN(true); + status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written); + + DBUG_RETURN(false); +} + + +int +MYSQL_BIN_LOG::write_state_to_file() +{ + File file_no; + IO_CACHE cache; + char buf[FN_REFLEN]; + int err; + bool opened= false; + bool inited= false; + + fn_format(buf, opt_bin_logname, mysql_data_home, ".state", + MY_UNPACK_FILENAME); + if ((file_no= mysql_file_open(key_file_binlog_state, buf, + O_RDWR|O_CREAT|O_TRUNC|O_BINARY, + MYF(MY_WME))) < 0) + { + err= 1; + goto err; + } + opened= true; + if ((err= init_io_cache(&cache, file_no, IO_SIZE, WRITE_CACHE, 0, 0, + MYF(MY_WME|MY_WAIT_IF_FULL)))) + goto err; + inited= true; + if ((err= rpl_global_gtid_binlog_state.write_to_iocache(&cache))) + goto err; + inited= false; + if ((err= end_io_cache(&cache))) + goto err; + if ((err= mysql_file_sync(file_no, MYF(MY_WME|MY_SYNC_FILESIZE)))) + goto err; + goto end; + +err: + sql_print_error("Error writing binlog state to file '%s'.\n", buf); + if (inited) + end_io_cache(&cache); +end: + if (opened) + mysql_file_close(file_no, MYF(0)); + + return err; +} + + +/* + Initialize the binlog state from the master-bin.state file, at server startup. + + Returns: + 0 for success. + 2 for when .state file did not exist. + 1 for other error. +*/ +int +MYSQL_BIN_LOG::read_state_from_file() +{ + File file_no; + IO_CACHE cache; + char buf[FN_REFLEN]; + int err; + bool opened= false; + bool inited= false; + + fn_format(buf, opt_bin_logname, mysql_data_home, ".state", + MY_UNPACK_FILENAME); + if ((file_no= mysql_file_open(key_file_binlog_state, buf, + O_RDONLY|O_BINARY, MYF(0))) < 0) + { + if (my_errno != ENOENT) + { + err= 1; + goto err; + } + else + { + /* + If the state file does not exist, this is the first server startup + with GTID enabled. So initialize to empty state. + */ + rpl_global_gtid_binlog_state.reset(); + err= 2; + goto end; + } + } + opened= true; + if ((err= init_io_cache(&cache, file_no, IO_SIZE, READ_CACHE, 0, 0, + MYF(MY_WME|MY_WAIT_IF_FULL)))) + goto err; + inited= true; + if ((err= rpl_global_gtid_binlog_state.read_from_iocache(&cache))) + goto err; + goto end; + +err: + sql_print_error("Error reading binlog GTID state from file '%s'.\n", buf); +end: + if (inited) + end_io_cache(&cache); + if (opened) + mysql_file_close(file_no, MYF(0)); + + return err; +} + + +int +MYSQL_BIN_LOG::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size) +{ + return rpl_global_gtid_binlog_state.get_most_recent_gtid_list(list, size); +} + + +bool +MYSQL_BIN_LOG::append_state_pos(String *str) +{ + return rpl_global_gtid_binlog_state.append_pos(str); +} + + +bool +MYSQL_BIN_LOG::append_state(String *str) +{ + return rpl_global_gtid_binlog_state.append_state(str); +} + + +bool +MYSQL_BIN_LOG::is_empty_state() +{ + return (rpl_global_gtid_binlog_state.count() == 0); +} + + +bool +MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id, + rpl_gtid *out_gtid) +{ + rpl_gtid *gtid; + if ((gtid= rpl_global_gtid_binlog_state.find(domain_id, server_id))) + *out_gtid= *gtid; + return gtid != NULL; +} + + +bool +MYSQL_BIN_LOG::lookup_domain_in_binlog_state(uint32 domain_id, + rpl_gtid *out_gtid) +{ + rpl_gtid *found_gtid; + + if ((found_gtid= rpl_global_gtid_binlog_state.find_most_recent(domain_id))) + { + *out_gtid= *found_gtid; + return true; + } + + return false; +} + + +int +MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint32 domain_id, uint64 seq_no) +{ + return rpl_global_gtid_binlog_state.bump_seq_no_if_needed(domain_id, seq_no); +} + + +bool +MYSQL_BIN_LOG::check_strict_gtid_sequence(uint32 domain_id, uint32 server_id, + uint64 seq_no) +{ + return rpl_global_gtid_binlog_state.check_strict_sequence(domain_id, + server_id, seq_no); +} + + /** Write an event to the binary log. If with_annotate != NULL and *with_annotate = TRUE write also Annotate_rows before the event @@ -5067,11 +5819,21 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) { THD *thd= event_info->thd; bool error= 1; - DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)"); binlog_cache_data *cache_data= 0; bool is_trans_cache= FALSE; bool using_trans= event_info->use_trans_cache(); bool direct= event_info->use_direct_logging(); + ulong prev_binlog_id; + DBUG_ENTER("MYSQL_BIN_LOG::write(Log_event *)"); + LINT_INIT(prev_binlog_id); + + if (thd->variables.option_bits & OPTION_GTID_BEGIN) + { + DBUG_PRINT("info", ("OPTION_GTID_BEGIN was set")); + /* Wait for commit from binary log before we commit */ + direct= 0; + using_trans= 1; + } if (thd->binlog_evt_union.do_union) { @@ -5120,9 +5882,26 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) if (direct) { + int res; + uint64 commit_id= 0; + DBUG_PRINT("info", ("direct is set")); + if ((res= thd->wait_for_prior_commit())) + DBUG_RETURN(res); file= &log_file; my_org_b_tell= my_b_tell(file); mysql_mutex_lock(&LOCK_log); + prev_binlog_id= current_binlog_id; + DBUG_EXECUTE_IF("binlog_force_commit_id", + { + const LEX_STRING name= { C_STRING_WITH_LEN("commit_id") }; + bool null_value; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, + (uchar*) name.str, name.length); + commit_id= entry->val_int(&null_value); + }); + if (write_gtid_event(thd, true, using_trans, commit_id)) + goto err; } else { @@ -5266,7 +6045,7 @@ err: mysql_mutex_unlock(&LOCK_log); if (check_purge) - purge(); + checkpoint_and_purge(prev_binlog_id); } if (error) @@ -5351,6 +6130,60 @@ bool general_log_write(THD *thd, enum enum_server_command command, return FALSE; } + +static void +binlog_checkpoint_callback(void *cookie) +{ + MYSQL_BIN_LOG::xid_count_per_binlog *entry= + (MYSQL_BIN_LOG::xid_count_per_binlog *)cookie; + /* + For every supporting engine, we increment the xid_count and issue a + commit_checkpoint_request(). Then we can count when all + commit_checkpoint_notify() callbacks have occurred, and then log a new + binlog checkpoint event. + */ + mysql_bin_log.mark_xids_active(entry->binlog_id, 1); +} + + +/* + Request a commit checkpoint from each supporting engine. + This must be called after each binlog rotate, and after LOCK_log has been + released. The xid_count value in the xid_count_per_binlog entry was + incremented by 1 and will be decremented in this function; this ensures + that the entry will not go away early despite LOCK_log not being held. +*/ +void +MYSQL_BIN_LOG::do_checkpoint_request(ulong binlog_id) +{ + xid_count_per_binlog *entry; + + /* + Find the binlog entry, and invoke commit_checkpoint_request() on it in + each supporting storage engine. + */ + mysql_mutex_lock(&LOCK_xid_list); + I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list); + do { + entry= it++; + DBUG_ASSERT(entry /* binlog_id is always somewhere in the list. */); + } while (entry->binlog_id != binlog_id); + mysql_mutex_unlock(&LOCK_xid_list); + + ha_commit_checkpoint_request(entry, binlog_checkpoint_callback); + /* + When we rotated the binlog, we incremented xid_count to make sure the + entry would not go away until this point, where we have done all necessary + commit_checkpoint_request() calls. + So now we can (and must) decrease the count - when it reaches zero, we + will know that both all pending unlog() and all pending + commit_checkpoint_notify() calls are done, and we can log a new binlog + checkpoint. + */ + mark_xid_done(binlog_id, true); +} + + /** The method executes rotation when LOCK_log is already acquired by the caller. @@ -5359,6 +6192,15 @@ bool general_log_write(THD *thd, enum enum_server_command command, @param check_purge is set to true if rotation took place @note + Caller _must_ check the check_purge variable. If this is set, it means + that the binlog was rotated, and caller _must_ ensure that + do_checkpoint_request() is called later with the binlog_id of the rotated + binlog file. The call to do_checkpoint_request() must happen after + LOCK_log is released (which is why we cannot simply do it here). + Usually, checkpoint_and_purge() is appropriate, as it will both handle + the checkpointing and any needed purging of old logs. + + @note If rotation fails, for instance the server was unable to create a new log file, we still try to write an incident event to the current log. @@ -5376,7 +6218,27 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge) if (force_rotate || (my_b_tell(&log_file) >= (my_off_t) max_size)) { + ulong binlog_id= current_binlog_id; + /* + We rotate the binlog, so we need to start a commit checkpoint in all + supporting engines - when it finishes, we can log a new binlog checkpoint + event. + + But we cannot start the checkpoint here - there could be a group commit + still in progress which needs to be included in the checkpoint, and + besides we do not want to do the (possibly expensive) checkpoint while + LOCK_log is held. + + On the other hand, we must be sure that the xid_count entry for the + previous log does not go away until we start the checkpoint - which it + could do as it is no longer the most recent. So we increment xid_count + (to count the pending checkpoint request) - this will fix the entry in + place until we decrement again in do_checkpoint_request(). + */ + mark_xids_active(binlog_id, 1); + if ((error= new_file_without_locking())) + { /** Be conservative... There are possible lost events (eg, failing to log the Execute_load_query_log_event @@ -5389,7 +6251,14 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge) if (!write_incident_already_locked(current_thd)) flush_and_sync(0); - *check_purge= true; + /* + We failed to rotate - so we have to decrement the xid_count back that + we incremented before attempting the rotate. + */ + mark_xid_done(binlog_id, false); + } + else + *check_purge= true; } DBUG_RETURN(error); } @@ -5417,6 +6286,13 @@ void MYSQL_BIN_LOG::purge() #endif } + +void MYSQL_BIN_LOG::checkpoint_and_purge(ulong binlog_id) +{ + do_checkpoint_request(binlog_id); + purge(); +} + /** The method is a shortcut of @c rotate() and @c purge(). LOCK_log is acquired prior to rotate and is released after it. @@ -5429,11 +6305,13 @@ void MYSQL_BIN_LOG::purge() int MYSQL_BIN_LOG::rotate_and_purge(bool force_rotate) { int error= 0; + ulong prev_binlog_id; DBUG_ENTER("MYSQL_BIN_LOG::rotate_and_purge"); bool check_purge= false; //todo: fix the macro def and restore safe_mutex_assert_not_owner(&LOCK_log); mysql_mutex_lock(&LOCK_log); + prev_binlog_id= current_binlog_id; if ((error= rotate(force_rotate, &check_purge))) check_purge= false; /* @@ -5443,7 +6321,7 @@ int MYSQL_BIN_LOG::rotate_and_purge(bool force_rotate) mysql_mutex_unlock(&LOCK_log); if (check_purge) - purge(); + checkpoint_and_purge(prev_binlog_id); DBUG_RETURN(error); } @@ -5512,9 +6390,10 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) long val; ulong end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t uchar header[LOG_EVENT_HEADER_LEN]; - ha_checksum crc= 0, crc_0= 0; // assignments to keep compiler happy + ha_checksum crc= 0, crc_0= 0; my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF); uchar buf[BINLOG_CHECKSUM_LEN]; + DBUG_ENTER("MYSQL_BIN_LOG::write_cache"); // while there is just one alg the following must hold: DBUG_ASSERT(!do_checksum || @@ -5536,9 +6415,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) group= (uint)my_b_tell(&log_file); hdr_offs= carry= 0; - if (do_checksum) - crc= crc_0= my_checksum(0L, NULL, 0); - + do { /* @@ -5567,7 +6444,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) /* write the first half of the split header */ if (my_b_write(&log_file, header, carry)) - return ER_ERROR_ON_WRITE; + DBUG_RETURN(ER_ERROR_ON_WRITE); status_var_add(thd->status_var.binlog_bytes_written, carry); /* @@ -5611,12 +6488,12 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) crc= my_checksum(crc, cache->read_pos, length); remains -= length; if (my_b_write(&log_file, cache->read_pos, length)) - return ER_ERROR_ON_WRITE; + DBUG_RETURN(ER_ERROR_ON_WRITE); if (remains == 0) { int4store(buf, crc); if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN)) - return ER_ERROR_ON_WRITE; + DBUG_RETURN(ER_ERROR_ON_WRITE); crc= crc_0; } } @@ -5643,7 +6520,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) DBUG_ASSERT(remains == 0); if (my_b_write(&log_file, cache->read_pos, hdr_offs) || my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN)) - return ER_ERROR_ON_WRITE; + DBUG_RETURN(ER_ERROR_ON_WRITE); crc= crc_0; } } @@ -5675,12 +6552,12 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) length, &crc); if (my_b_write(&log_file, ev, remains == 0 ? event_len : length - hdr_offs)) - return ER_ERROR_ON_WRITE; + DBUG_RETURN(ER_ERROR_ON_WRITE); if (remains == 0) { int4store(buf, crc); if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN)) - return ER_ERROR_ON_WRITE; + DBUG_RETURN(ER_ERROR_ON_WRITE); crc= crc_0; // crc is complete } } @@ -5705,10 +6582,10 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) /* Write data to the binary log file */ DBUG_EXECUTE_IF("fail_binlog_write_1", - errno= 28; return ER_ERROR_ON_WRITE;); + errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE);); if (!do_checksum) if (my_b_write(&log_file, cache->read_pos, length)) - return ER_ERROR_ON_WRITE; + DBUG_RETURN(ER_ERROR_ON_WRITE); status_var_add(thd->status_var.binlog_bytes_written, length); cache->read_pos=cache->read_end; // Mark buffer used up @@ -5718,7 +6595,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) DBUG_ASSERT(!do_checksum || remains == 0); DBUG_ASSERT(!do_checksum || crc == crc_0); - return 0; // All OK + DBUG_RETURN(0); // All OK } /* @@ -5730,9 +6607,9 @@ int query_error_code(THD *thd, bool not_killed) if (not_killed || (killed_mask_hard(thd->killed) == KILL_BAD_DATA)) { - error= thd->is_error() ? thd->stmt_da->sql_errno() : 0; + error= thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0; - /* thd->stmt_da->sql_errno() might be ER_SERVER_SHUTDOWN or + /* thd->get_get_stmt_da()->sql_errno() might be ER_SERVER_SHUTDOWN or ER_QUERY_INTERRUPTED, So here we need to make sure that error is not set to these errors when specified not_killed by the caller. @@ -5774,11 +6651,13 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) uint error= 0; my_off_t offset; bool check_purge= false; + ulong prev_binlog_id; DBUG_ENTER("MYSQL_BIN_LOG::write_incident"); mysql_mutex_lock(&LOCK_log); if (likely(is_open())) { + prev_binlog_id= current_binlog_id; if (!(error= write_incident_already_locked(thd)) && !(error= flush_and_sync(0))) { @@ -5798,7 +6677,7 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) mysql_mutex_unlock(&LOCK_log); if (check_purge) - purge(); + checkpoint_and_purge(prev_binlog_id); } else { @@ -5808,6 +6687,45 @@ bool MYSQL_BIN_LOG::write_incident(THD *thd) DBUG_RETURN(error); } +void +MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name, + uint len) +{ + my_off_t offset; + Binlog_checkpoint_log_event ev(name, len); + /* + Note that we must sync the binlog checkpoint to disk. + Otherwise a subsequent log purge could delete binlogs that XA recovery + thinks are needed (even though they are not really). + */ + if (!ev.write(&log_file) && !flush_and_sync(0)) + { + signal_update(); + } + else + { + /* + If we fail to write the checkpoint event, something is probably really + bad with the binlog. We complain in the error log. + + Note that failure to write binlog checkpoint does not compromise the + ability to do crash recovery - crash recovery will just have to scan a + bit more of the binlog than strictly necessary. + */ + sql_print_error("Failed to write binlog checkpoint event to binary log\n"); + } + + offset= my_b_tell(&log_file); + /* + Take mutex to protect against a reader seeing partial writes of 64-bit + offset on 32-bit CPUs. + */ + mysql_mutex_lock(&LOCK_commit_ordered); + last_commit_pos_offset= offset; + mysql_mutex_unlock(&LOCK_commit_ordered); +} + + /** Write a cached log entry to the binary log. - To support transaction over replication, we wrap the transaction @@ -5840,6 +6758,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, bool using_trx_cache) { group_commit_entry entry; + Ha_trx_info *ha_info; DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog"); entry.thd= thd; @@ -5848,20 +6767,16 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, entry.all= all; entry.using_stmt_cache= using_stmt_cache; entry.using_trx_cache= using_trx_cache; + entry.need_unlog= false; + ha_info= all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; + for (; ha_info; ha_info= ha_info->next()) + { + if (ha_info->is_started() && ha_info->ht() != binlog_hton && + !ha_info->ht()->commit_checkpoint_request) + entry.need_unlog= true; + break; + } - /* - Log "BEGIN" at the beginning of every transaction. Here, a transaction is - either a BEGIN..COMMIT block or a single statement in autocommit mode. - - Create the necessary events here, where we have the correct THD (and - thread context). - - Due to group commit the actual writing to binlog may happen in a different - thread. - */ - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), using_trx_cache, TRUE, - TRUE, 0); - entry.begin_event= &qinfo; entry.end_event= end_ev; if (cache_mngr->stmt_cache.has_incident() || cache_mngr->trx_cache.has_incident()) @@ -5877,45 +6792,343 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, } } -bool -MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) + +/* + Put a transaction that is ready to commit in the group commit queue. + The transaction is identified by the ENTRY object passed into this function. + + To facilitate group commit for the binlog, we first queue up ourselves in + this function. Then later the first thread to enter the queue waits for + the LOCK_log mutex, and commits for everyone in the queue once it gets the + lock. Any other threads in the queue just wait for the first one to finish + the commit and wake them up. This way, all transactions in the queue get + committed in a single disk operation. + + The main work in this function is when the commit in one transaction has + been marked to wait for the commit of another transaction to happen + first. This is used to support in-order parallel replication, where + transactions can execute out-of-order but need to be committed in-order with + how they happened on the master. The waiting of one commit on another needs + to be integrated with the group commit queue, to ensure that the waiting + transaction can participate in the same group commit as the waited-for + transaction. + + So when we put a transaction in the queue, we check if there were other + transactions already prepared to commit but just waiting for the first one + to commit. If so, we add those to the queue as well, transitively for all + waiters. + + And if a transaction is marked to wait for a prior transaction, but that + prior transaction is already queued for group commit, then we can queue the + new transaction directly to participate in the group commit. + + @retval < 0 Error + @retval > 0 If queued as the first entry in the queue (meaning this + is the leader) + @retval 0 Otherwise (queued as participant, leader handles the commit) +*/ + +int +MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) { + group_commit_entry *entry, *orig_queue, *last; + wait_for_commit *cur; + wait_for_commit *wfc; + DBUG_ENTER("MYSQL_BIN_LOG::queue_for_group_commit"); + + /* + Check if we need to wait for another transaction to commit before us. + + It is safe to do a quick check without lock first in the case where we do + not have to wait. But if the quick check shows we need to wait, we must do + another safe check under lock, to avoid the race where the other + transaction wakes us up between the check and the wait. + */ + wfc= orig_entry->thd->wait_for_commit_ptr; + orig_entry->queued_by_other= false; + if (wfc && wfc->waitee) + { + mysql_mutex_lock(&wfc->LOCK_wait_commit); + /* + Do an extra check here, this time safely under lock. + + If waitee->commit_started is set, it means that the transaction we need + to wait for has already queued up for group commit. In this case it is + safe for us to queue up immediately as well, increasing the opprtunities + for group commit. Because waitee has taken the LOCK_prepare_ordered + before setting the flag, so there is no risk that we can queue ahead of + it. + */ + if (wfc->waitee && !wfc->waitee->commit_started) + { + PSI_stage_info old_stage; + wait_for_commit *loc_waitee; + + /* + By setting wfc->opaque_pointer to our own entry, we mark that we are + ready to commit, but waiting for another transaction to commit before + us. + + This other transaction may then take over the commit process for us to + get us included in its own group commit. If this happens, the + queued_by_other flag is set. + + Setting this flag may or may not be seen by the other thread, but we + are safe in any case: The other thread will set queued_by_other under + its LOCK_wait_commit, and we will not check queued_by_other only after + we have been woken up. + */ + wfc->opaque_pointer= orig_entry; + DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior"); + orig_entry->thd->ENTER_COND(&wfc->COND_wait_commit, + &wfc->LOCK_wait_commit, + &stage_waiting_for_prior_transaction_to_commit, + &old_stage); + while ((loc_waitee= wfc->waitee) && !orig_entry->thd->check_killed()) + mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit); + wfc->opaque_pointer= NULL; + DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d", + orig_entry->queued_by_other)); + + if (loc_waitee) + { + /* Wait terminated due to kill. */ + mysql_mutex_lock(&loc_waitee->LOCK_wait_commit); + if (loc_waitee->wakeup_subsequent_commits_running || + orig_entry->queued_by_other) + { + /* Our waitee is already waking us up, so ignore the kill. */ + mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); + do + { + mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit); + } while (wfc->waitee); + } + else + { + /* We were killed, so remove us from the list of waitee. */ + wfc->remove_from_list(&loc_waitee->subsequent_commits_list); + mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); + wfc->waitee= NULL; + + orig_entry->thd->EXIT_COND(&old_stage); + /* Interrupted by kill. */ + DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior_killed"); + wfc->wakeup_error= orig_entry->thd->killed_errno(); + if (!wfc->wakeup_error) + wfc->wakeup_error= ER_QUERY_INTERRUPTED; + my_message(wfc->wakeup_error, ER(wfc->wakeup_error), MYF(0)); + DBUG_RETURN(-1); + } + } + orig_entry->thd->EXIT_COND(&old_stage); + } + else + mysql_mutex_unlock(&wfc->LOCK_wait_commit); + } /* - To facilitate group commit for the binlog, we first queue up ourselves in - the group commit queue. Then the first thread to enter the queue waits for - the LOCK_log mutex, and commits for everyone in the queue once it gets the - lock. Any other threads in the queue just wait for the first one to finish - the commit and wake them up. + If the transaction we were waiting for has already put us into the group + commit queue (and possibly already done the entire binlog commit for us), + then there is nothing else to do. */ + if (orig_entry->queued_by_other) + DBUG_RETURN(0); + + if (wfc && wfc->wakeup_error) + { + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); + DBUG_RETURN(-1); + } - entry->thd->clear_wakeup_ready(); + /* Now enqueue ourselves in the group commit queue. */ + DEBUG_SYNC(orig_entry->thd, "commit_before_enqueue"); + orig_entry->thd->clear_wakeup_ready(); mysql_mutex_lock(&LOCK_prepare_ordered); - group_commit_entry *orig_queue= group_commit_queue; - entry->next= orig_queue; - group_commit_queue= entry; + orig_queue= group_commit_queue; + + /* + Iteratively process everything added to the queue, looking for waiters, + and their waiters, and so on. If a waiter is ready to commit, we + immediately add it to the queue, and mark it as queued_by_other. + + This would be natural to do with recursion, but we want to avoid + potentially unbounded recursion blowing the C stack, so we use the list + approach instead. + + We keep a list of the group_commit_entry of all the waiters that need to + be processed. Initially this list contains only the entry passed into this + function. + + We process entries in the list one by one. The element currently being + processed is pointed to by `entry`, and the element at the end of the list + is pointed to by `last` (we do not use NULL to terminate the list). + + As we process an entry, any waiters for that entry are added at the end of + the list, to be processed in subsequent iterations. The the entry is added + to the group_commit_queue. This continues until the list is exhausted, + with all entries ever added eventually processed. + + The end result is a breath-first traversal of the tree of waiters, + re-using the `next' pointers of the group_commit_entry objects in place of + extra stack space in a recursive traversal. + + The temporary list linked through these `next' pointers is not used by the + caller or any other function; it only exists while doing the iterative + tree traversal. After, all the processed entries are linked into the + group_commit_queue. + */ - if (entry->cache_mngr->using_xa) + cur= wfc; + last= orig_entry; + entry= orig_entry; + for (;;) { - DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered"); - run_prepare_ordered(entry->thd, entry->all); - DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered"); + group_commit_entry *next_entry; + + if (entry->cache_mngr->using_xa) + { + DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered"); + run_prepare_ordered(entry->thd, entry->all); + DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered"); + } + + if (cur) + { + /* + Now that we have taken LOCK_prepare_ordered and will queue up in the + group commit queue, it is safe for following transactions to queue + themselves. We will grab here any transaction that is now ready to + queue up, but after that, more transactions may become ready while the + leader is waiting to start the group commit. So set the flag + `commit_started', so that later transactions can still participate in + the group commit.. + */ + cur->commit_started= true; + + /* + Check if this transaction has other transaction waiting for it to + commit. + + If so, process the waiting transactions, and their waiters and so on, + transitively. + */ + if (cur->subsequent_commits_list) + { + wait_for_commit *waiter, **waiter_ptr; + + mysql_mutex_lock(&cur->LOCK_wait_commit); + /* + Grab the list, now safely under lock, and process it if still + non-empty. + */ + waiter= cur->subsequent_commits_list; + waiter_ptr= &cur->subsequent_commits_list; + while (waiter) + { + wait_for_commit *next_waiter= waiter->next_subsequent_commit; + group_commit_entry *entry2= + (group_commit_entry *)waiter->opaque_pointer; + if (entry2) + { + /* + This is another transaction ready to be written to the binary + log. We can put it into the queue directly, without needing a + separate context switch to the other thread. We just set a flag + so that the other thread will know when it wakes up that it was + already processed. + + So remove it from the list of our waiters, and instead put it at + the end of the list to be processed in a subsequent iteration of + the outer loop. + */ + *waiter_ptr= next_waiter; + entry2->queued_by_other= true; + last->next= entry2; + last= entry2; + /* + As a small optimisation, we do not actually need to set + entry2->next to NULL, as we can use the pointer `last' to check + for end-of-list. + */ + } + else + { + /* + This transaction is not ready to participate in the group commit + yet, so leave it in the waiter list. It might join the group + commit later, if it completes soon enough to do so (it will see + our wfc->commit_started flag set), or it might commit later in a + later group commit. + */ + waiter_ptr= &waiter->next_subsequent_commit; + } + waiter= next_waiter; + } + mysql_mutex_unlock(&cur->LOCK_wait_commit); + } + } + + /* + Handle the heuristics that if another transaction is waiting for this + transaction (or if it does so later), then we want to trigger group + commit immediately, without waiting for the binlog_commit_wait_usec + timeout to expire. + */ + entry->thd->waiting_on_group_commit= true; + + /* Add the entry to the group commit queue. */ + next_entry= entry->next; + entry->next= group_commit_queue; + group_commit_queue= entry; + if (entry == last) + break; + /* + Move to the next entry in the flattened list of waiting transactions + that still need to be processed transitively. + */ + entry= next_entry; + DBUG_ASSERT(entry != NULL); + cur= entry->thd->wait_for_commit_ptr; } + + if (opt_binlog_commit_wait_count > 0 && orig_queue != NULL) + mysql_cond_signal(&COND_prepare_ordered); mysql_mutex_unlock(&LOCK_prepare_ordered); - DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered"); + DEBUG_SYNC(orig_entry->thd, "commit_after_release_LOCK_prepare_ordered"); + + DBUG_PRINT("info", ("Queued for group commit as %s\n", + (orig_queue == NULL) ? "leader" : "participant")); + DBUG_RETURN(orig_queue == NULL); +} + +bool +MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) +{ + int is_leader= queue_for_group_commit(entry); /* - The first in the queue handle group commit for all; the others just wait + The first in the queue handles group commit for all; the others just wait to be signalled when group commit is done. */ - if (orig_queue != NULL) + if (is_leader < 0) + return true; /* Error */ + else if (is_leader) + trx_group_commit_leader(entry); + else if (!entry->queued_by_other) entry->thd->wait_for_wakeup_ready(); else - trx_group_commit_leader(entry); + { + /* + If we were queued by another prior commit, then we are woken up + only when the leader has already completed the commit for us. + So nothing to do here then. + */ + } if (!opt_optimize_thread_scheduling) { /* For the leader, trx_group_commit_leader() already took the lock. */ - if (orig_queue != NULL) + if (!is_leader) mysql_mutex_lock(&LOCK_commit_ordered); DEBUG_SYNC(entry->thd, "commit_loop_entry_commit_ordered"); @@ -5931,15 +7144,41 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) DEBUG_SYNC(entry->thd, "commit_after_group_run_commit_ordered"); } mysql_mutex_unlock(&LOCK_commit_ordered); + entry->thd->wakeup_subsequent_commits(entry->error); if (next) { - next->thd->signal_wakeup_ready(); + /* + Wake up the next thread in the group commit. + + The next thread can be waiting in two different ways, depending on + whether it put itself in the queue, or if it was put in queue by us + because it had to wait for us to commit first. + + So execute the appropriate wakeup, identified by the queued_by_other + field. + */ + if (next->queued_by_other) + next->thd->wait_for_commit_ptr->wakeup(entry->error); + else + next->thd->signal_wakeup_ready(); + } + else + { + /* + If we rotated the binlog, and if we are using the unoptimized thread + scheduling where every thread runs its own commit_ordered(), then we + must do the commit checkpoint and log purge here, after all + commit_ordered() calls have finished, and locks have been released. + */ + if (entry->check_purge) + checkpoint_and_purge(entry->binlog_id); } + } if (likely(!entry->error)) - return 0; + return entry->thd->wait_for_prior_commit(); switch (entry->error) { @@ -5966,8 +7205,9 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) we need to mark it as not needed for recovery (unlog() is not called for a transaction if log_xid() fails). */ - if (entry->cache_mngr->using_xa && entry->cache_mngr->xa_xid) - mark_xid_done(); + if (entry->cache_mngr->using_xa && entry->cache_mngr->xa_xid && + entry->cache_mngr->need_unlog) + mark_xid_done(entry->cache_mngr->binlog_id, true); return 1; } @@ -5987,32 +7227,49 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) { uint xid_count= 0; my_off_t UNINIT_VAR(commit_offset); - group_commit_entry *current; - group_commit_entry *last_in_queue; + group_commit_entry *current, *last_in_queue; group_commit_entry *queue= NULL; bool check_purge= false; + ulong binlog_id; + uint64 commit_id; DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader"); + LINT_INIT(binlog_id); - DBUG_ASSERT(is_open()); - if (likely(is_open())) // Should always be true { + DBUG_EXECUTE_IF("inject_binlog_commit_before_get_LOCK_log", + DBUG_ASSERT(!debug_sync_set_action(leader->thd, STRING_WITH_LEN + ("commit_before_get_LOCK_log SIGNAL waiting WAIT_FOR cont TIMEOUT 1"))); + ); /* Lock the LOCK_log(), and once we get it, collect any additional writes that queued up while we were waiting. */ + DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_log"); mysql_mutex_lock(&LOCK_log); DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log"); mysql_mutex_lock(&LOCK_prepare_ordered); + if (opt_binlog_commit_wait_count) + wait_for_sufficient_commits(); + /* + Note that wait_for_sufficient_commits() may have released and + re-acquired the LOCK_log and LOCK_prepare_ordered if it needed to wait. + */ current= group_commit_queue; group_commit_queue= NULL; mysql_mutex_unlock(&LOCK_prepare_ordered); + binlog_id= current_binlog_id; /* As the queue is in reverse order of entering, reverse it. */ last_in_queue= current; while (current) { group_commit_entry *next= current->next; + /* + Now that group commit is started, we can clear the flag; there is no + longer any use in waiters on this commit trying to trigger it early. + */ + current->thd->waiting_on_group_commit= false; current->next= queue; queue= current; current= next; @@ -6020,7 +7277,21 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) DBUG_ASSERT(leader == queue /* the leader should be first in queue */); /* Now we have in queue the list of transactions to be committed in order. */ + } + DBUG_ASSERT(is_open()); + if (likely(is_open())) // Should always be true + { + commit_id= (last_in_queue == leader ? 0 : (uint64)leader->thd->query_id); + DBUG_EXECUTE_IF("binlog_force_commit_id", + { + const LEX_STRING name= { C_STRING_WITH_LEN("commit_id") }; + bool null_value; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&leader->thd->user_vars, + (uchar*) name.str, name.length); + commit_id= entry->val_int(&null_value); + }); /* Commit every transaction in the queue. @@ -6041,13 +7312,31 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) */ DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty()); - current->error= write_transaction_or_stmt(current); + if ((current->error= write_transaction_or_stmt(current, commit_id))) + current->commit_errno= errno; strmake_buf(cache_mngr->last_commit_pos_file, log_file_name); commit_offset= my_b_write_tell(&log_file); cache_mngr->last_commit_pos_offset= commit_offset; if (cache_mngr->using_xa && cache_mngr->xa_xid) - xid_count++; + { + /* + If all storage engines support commit_checkpoint_request(), then we + do not need to keep track of when this XID is durably committed. + Instead we will just ask the storage engine to durably commit all its + XIDs when we rotate a binlog file. + */ + if (current->need_unlog) + { + xid_count++; + cache_mngr->need_unlog= true; + cache_mngr->binlog_id= binlog_id; + } + else + cache_mngr->need_unlog= false; + + cache_mngr->delayed_error= false; + } } bool synced= 0; @@ -6090,33 +7379,37 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) } /* - if any commit_events are Xid_log_event, increase the number of - prepared_xids (it's decreased in ::unlog()). Binlog cannot be rotated - if there're prepared xids in it - see the comment in new_file() for - an explanation. - If no Xid_log_events (then it's all Query_log_event) rotate binlog, - if necessary. + If any commit_events are Xid_log_event, increase the number of pending + XIDs in current binlog (it's decreased in ::unlog()). When the count in + a (not active) binlog file reaches zero, we know that it is no longer + needed in XA recovery, and we can log a new binlog checkpoint event. */ if (xid_count > 0) { - mark_xids_active(xid_count); + mark_xids_active(binlog_id, xid_count); } - else + + if (rotate(false, &check_purge)) { - if (rotate(false, &check_purge)) - { - /* - If we fail to rotate, which thread should get the error? - We give the error to the *last* transaction thread; that seems to - make the most sense, as it was the last to write to the log. - */ - last_in_queue->error= ER_ERROR_ON_WRITE; - last_in_queue->commit_errno= errno; - check_purge= false; - } - /* In case of binlog rotate, update the correct current binlog offset. */ - commit_offset= my_b_write_tell(&log_file); + /* + If we fail to rotate, which thread should get the error? + We give the error to the leader, as any my_error() thrown inside + rotate() will have been registered for the leader THD. + + However we must not return error from here - that would cause + ha_commit_trans() to abort and rollback the transaction, which would + leave an inconsistent state with the transaction committed in the + binlog but rolled back in the engine. + + Instead set a flag so that we can return error later, from unlog(), + when the transaction has been safely committed in the engine. + */ + leader->cache_mngr->delayed_error= true; + my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, errno); + check_purge= false; } + /* In case of binlog rotate, update the correct current binlog offset. */ + commit_offset= my_b_write_tell(&log_file); } DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_commit_ordered"); @@ -6130,9 +7423,6 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) */ mysql_mutex_unlock(&LOCK_log); - if (check_purge) - purge(); - DEBUG_SYNC(leader->thd, "commit_after_release_LOCK_log"); ++num_group_commits; @@ -6150,6 +7440,15 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) mysql_cond_wait(&COND_queue_busy, &LOCK_commit_ordered); group_commit_queue_busy= TRUE; + /* + Set these so parent can run checkpoint_and_purge() in last thread. + (When using optimized thread scheduling, we run checkpoint_and_purge() + in this function, so parent does not need to and we need not set these + values). + */ + last_in_queue->check_purge= check_purge; + last_in_queue->binlog_id= binlog_id; + /* Note that we return with LOCK_commit_ordered locked! */ DBUG_VOID_RETURN; } @@ -6165,8 +7464,10 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) DEBUG_SYNC(leader->thd, "commit_loop_entry_commit_ordered"); ++num_commits; - if (current->cache_mngr->using_xa && !current->error) + if (current->cache_mngr->using_xa && !current->error && + DBUG_EVALUATE_IF("skip_commit_ordered", 0, 1)) run_commit_ordered(current->thd, current->all); + current->thd->wakeup_subsequent_commits(current->error); /* Careful not to access current->next after waking up the other thread! As @@ -6174,32 +7475,40 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) */ next= current->next; if (current != leader) // Don't wake up ourself - current->thd->signal_wakeup_ready(); + { + if (current->queued_by_other) + current->thd->wait_for_commit_ptr->wakeup(current->error); + else + current->thd->signal_wakeup_ready(); + } current= next; } DEBUG_SYNC(leader->thd, "commit_after_group_run_commit_ordered"); mysql_mutex_unlock(&LOCK_commit_ordered); + DEBUG_SYNC(leader->thd, "commit_after_group_release_commit_ordered"); + + if (check_purge) + checkpoint_and_purge(binlog_id); DBUG_VOID_RETURN; } int -MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry) +MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, + uint64 commit_id) { binlog_cache_mngr *mngr= entry->cache_mngr; + DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_or_stmt"); - if (entry->begin_event->write(&log_file)) - return ER_ERROR_ON_WRITE; - status_var_add(entry->thd->status_var.binlog_bytes_written, - entry->begin_event->data_written); + if (write_gtid_event(entry->thd, false, entry->using_trx_cache, commit_id)) + DBUG_RETURN(ER_ERROR_ON_WRITE); if (entry->using_stmt_cache && !mngr->stmt_cache.empty() && write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE))) { entry->error_cache= &mngr->stmt_cache.cache_log; - entry->commit_errno= errno; - return ER_ERROR_ON_WRITE; + DBUG_RETURN(ER_ERROR_ON_WRITE); } if (entry->using_trx_cache && !mngr->trx_cache.empty()) @@ -6219,16 +7528,21 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry) if (write_cache(entry->thd, mngr->get_binlog_cache_log(TRUE))) { entry->error_cache= &mngr->trx_cache.cache_log; - entry->commit_errno= errno; - return ER_ERROR_ON_WRITE; + DBUG_RETURN(ER_ERROR_ON_WRITE); } } + DBUG_EXECUTE_IF("inject_error_writing_xid", + { + entry->error_cache= NULL; + errno= 28; + DBUG_RETURN(ER_ERROR_ON_WRITE); + }); + if (entry->end_event->write(&log_file)) { entry->error_cache= NULL; - entry->commit_errno= errno; - return ER_ERROR_ON_WRITE; + DBUG_RETURN(ER_ERROR_ON_WRITE); } status_var_add(entry->thd->status_var.binlog_bytes_written, entry->end_event->data_written); @@ -6238,27 +7552,156 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry) if (entry->incident_event->write(&log_file)) { entry->error_cache= NULL; - entry->commit_errno= errno; - return ER_ERROR_ON_WRITE; + DBUG_RETURN(ER_ERROR_ON_WRITE); } } if (mngr->get_binlog_cache_log(FALSE)->error) // Error on read { entry->error_cache= &mngr->stmt_cache.cache_log; - entry->commit_errno= errno; - return ER_ERROR_ON_READ; + DBUG_RETURN(ER_ERROR_ON_WRITE); } if (mngr->get_binlog_cache_log(TRUE)->error) // Error on read { entry->error_cache= &mngr->trx_cache.cache_log; - entry->commit_errno= errno; - return ER_ERROR_ON_READ; + DBUG_RETURN(ER_ERROR_ON_WRITE); } - return 0; + DBUG_RETURN(0); } + +/* + Wait for sufficient commits to queue up for group commit, according to the + values of binlog_commit_wait_count and binlog_commit_wait_usec. + + Note that this function may release and re-acquire LOCK_log and + LOCK_prepare_ordered if it needs to wait. +*/ + +void +MYSQL_BIN_LOG::wait_for_sufficient_commits() +{ + size_t count; + group_commit_entry *e; + group_commit_entry *last_head; + struct timespec wait_until; + + mysql_mutex_assert_owner(&LOCK_log); + mysql_mutex_assert_owner(&LOCK_prepare_ordered); + + for (e= last_head= group_commit_queue, count= 0; e; e= e->next) + { + if (++count >= opt_binlog_commit_wait_count) + { + group_commit_trigger_count++; + return; + } + if (unlikely(e->thd->has_waiter)) + { + group_commit_trigger_lock_wait++; + return; + } + } + + mysql_mutex_unlock(&LOCK_log); + set_timespec_nsec(wait_until, (ulonglong)1000*opt_binlog_commit_wait_usec); + + for (;;) + { + int err; + group_commit_entry *head; + + err= mysql_cond_timedwait(&COND_prepare_ordered, &LOCK_prepare_ordered, + &wait_until); + if (err == ETIMEDOUT) + { + group_commit_trigger_timeout++; + break; + } + if (unlikely(last_head->thd->has_waiter)) + { + group_commit_trigger_lock_wait++; + break; + } + head= group_commit_queue; + for (e= head; e && e != last_head; e= e->next) + { + ++count; + if (unlikely(e->thd->has_waiter)) + { + group_commit_trigger_lock_wait++; + goto after_loop; + } + } + if (count >= opt_binlog_commit_wait_count) + { + group_commit_trigger_count++; + break; + } + last_head= head; + } +after_loop: + + /* + We must not wait for LOCK_log while holding LOCK_prepare_ordered. + LOCK_log can be held for long periods (eg. we do I/O under it), while + LOCK_prepare_ordered must only be held for short periods. + + In addition, waiting for LOCK_log while holding LOCK_prepare_ordered would + violate locking order of LOCK_log-before-LOCK_prepare_ordered. This could + cause SAFEMUTEX warnings (even if it cannot actually deadlock with current + code, as there can be at most one group commit leader thread at a time). + + So release and re-acquire LOCK_prepare_ordered if we need to wait for the + LOCK_log. + */ + if (mysql_mutex_trylock(&LOCK_log)) + { + mysql_mutex_unlock(&LOCK_prepare_ordered); + mysql_mutex_lock(&LOCK_log); + mysql_mutex_lock(&LOCK_prepare_ordered); + } +} + + +void +MYSQL_BIN_LOG::binlog_trigger_immediate_group_commit() +{ + group_commit_entry *head; + mysql_mutex_assert_owner(&LOCK_prepare_ordered); + head= group_commit_queue; + if (head) + { + head->thd->has_waiter= true; + mysql_cond_signal(&COND_prepare_ordered); + } +} + + +/* + This function is called when a transaction T1 goes to wait for another + transaction T2. It is used to cut short any binlog group commit delay from + --binlog-commit-wait-count in the case where another transaction is stalled + on the wait due to conflicting row locks. + + If T2 is already ready to group commit, any waiting group commit will be + signalled to proceed immediately. Otherwise, a flag will be set in T2, and + when T2 later becomes ready, immediate group commit will be triggered. +*/ +void +binlog_report_wait_for(THD *thd1, THD *thd2) +{ + if (opt_binlog_commit_wait_count == 0) + return; + mysql_mutex_lock(&LOCK_prepare_ordered); + thd2->has_waiter= true; + if (thd2->waiting_on_group_commit) + mysql_bin_log.binlog_trigger_immediate_group_commit(); + mysql_mutex_unlock(&LOCK_prepare_ordered); +} + + /** Wait until we get a signal that the relay log has been updated. @@ -6272,15 +7715,14 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry) void MYSQL_BIN_LOG::wait_for_update_relay_log(THD* thd) { - const char *old_msg; + PSI_stage_info old_stage; DBUG_ENTER("wait_for_update_relay_log"); - old_msg= thd->enter_cond(&update_cond, &LOCK_log, - "Slave has read all relay log; " - "waiting for the slave I/O " - "thread to update it" ); + thd->ENTER_COND(&update_cond, &LOCK_log, + &stage_slave_has_read_all_relay_log, + &old_stage); mysql_cond_wait(&update_cond, &LOCK_log); - thd->exit_cond(old_msg); + thd->EXIT_COND(&old_stage); DBUG_VOID_RETURN; } @@ -6335,12 +7777,16 @@ int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd, void MYSQL_BIN_LOG::close(uint exiting) { // One can't set log_type here! + bool failed_to_save_state= false; DBUG_ENTER("MYSQL_BIN_LOG::close"); DBUG_PRINT("enter",("exiting: %d", (int) exiting)); + + mysql_mutex_assert_owner(&LOCK_log); + if (log_state == LOG_OPENED) { #ifdef HAVE_REPLICATION - if (log_type == LOG_BIN && !no_auto_events && + if (log_type == LOG_BIN && (exiting & LOG_CLOSE_STOP_EVENT)) { Stop_log_event s; @@ -6352,6 +7798,27 @@ void MYSQL_BIN_LOG::close(uint exiting) s.write(&log_file); bytes_written+= s.data_written; signal_update(); + + /* + When we shut down server, write out the binlog state to a separate + file so we do not have to scan an entire binlog file to recover it + at next server start. + + Note that this must be written and synced to disk before marking the + last binlog file as "not crashed". + */ + if (!is_relay_log && write_state_to_file()) + { + sql_print_error("Failed to save binlog GTID state during shutdown. " + "Binlog will be marked as crashed, so that crash " + "recovery can recover the state at next server " + "startup."); + /* + Leave binlog file marked as crashed, so we can recover state by + scanning it now that we failed to write out the state properly. + */ + failed_to_save_state= true; + } } #endif /* HAVE_REPLICATION */ @@ -6360,7 +7827,8 @@ void MYSQL_BIN_LOG::close(uint exiting) && !(exiting & LOG_CLOSE_DELAYED_CLOSE)) { my_off_t org_position= mysql_file_tell(log_file.file, MYF(0)); - clear_inuse_flag_when_closing(log_file.file); + if (!failed_to_save_state) + clear_inuse_flag_when_closing(log_file.file); /* Restore position so that anything we have in the IO_cache is written to the correct position. @@ -6564,7 +8032,7 @@ static void print_buffer_to_nt_eventlog(enum loglevel level, char *buff, DBUG_ENTER("print_buffer_to_nt_eventlog"); /* Add ending CR/LF's to string, overwrite last chars if necessary */ - strmov(buffptr+min(length, buffLen-5), "\r\n\r\n"); + strmov(buffptr+MY_MIN(length, buffLen-5), "\r\n\r\n"); setup_windows_event_source(); if ((event= RegisterEventSource(NULL,"MySQL"))) @@ -6598,16 +8066,33 @@ static void print_buffer_to_file(enum loglevel level, const char *buffer, time_t skr; struct tm tm_tmp; struct tm *start; + THD *thd; + int tag_length= 0; + char tag[NAME_LEN]; DBUG_ENTER("print_buffer_to_file"); DBUG_PRINT("enter",("buffer: %s", buffer)); + if (mysqld_server_initialized && (thd= current_thd)) + { + if (thd->connection_name.length) + { + /* + Add tag for slaves so that the user can see from which connection + the error originates. + */ + tag_length= my_snprintf(tag, sizeof(tag), ER(ER_MASTER_LOG_PREFIX), + (int) thd->connection_name.length, + thd->connection_name.str); + } + } + mysql_mutex_lock(&LOCK_error_log); skr= my_time(0); localtime_r(&skr, &tm_tmp); start=&tm_tmp; - fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d [%s] %.*s\n", + fprintf(stderr, "%02d%02d%02d %2d:%02d:%02d [%s] %.*s%.*s\n", start->tm_year % 100, start->tm_mon+1, start->tm_mday, @@ -6616,6 +8101,7 @@ static void print_buffer_to_file(enum loglevel level, const char *buffer, start->tm_sec, (level == ERROR_LEVEL ? "ERROR" : level == WARNING_LEVEL ? "Warning" : "Note"), + tag_length, tag, (int) length, buffer); fflush(stderr); @@ -6761,6 +8247,9 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, mysql_mutex_unlock(&LOCK_prepare_ordered); } + if (thd->wait_for_prior_commit()) + return 0; + cookie= 0; if (xid) cookie= log_one_transaction(xid); @@ -6974,6 +8463,8 @@ int TC_LOG_MMAP::open(const char *opt_name) mysql_mutex_init(key_LOCK_sync, &LOCK_sync, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_active, &LOCK_active, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_pool, &LOCK_pool, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_pending_checkpoint, &LOCK_pending_checkpoint, + MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_active, &COND_active, 0); mysql_cond_init(key_COND_pool, &COND_pool, 0); mysql_cond_init(key_TC_LOG_MMAP_COND_queue_busy, &COND_queue_busy, 0); @@ -7224,17 +8715,93 @@ int TC_LOG_MMAP::sync() return err; } +static void +mmap_do_checkpoint_callback(void *data) +{ + TC_LOG_MMAP::pending_cookies *pending= + static_cast<TC_LOG_MMAP::pending_cookies *>(data); + ++pending->pending_count; +} + +int TC_LOG_MMAP::unlog(ulong cookie, my_xid xid) +{ + pending_cookies *full_buffer= NULL; + DBUG_ASSERT(*(my_xid *)(data+cookie) == xid); + + /* + Do not delete the entry immediately, as there may be participating storage + engines which implement commit_checkpoint_request(), and thus have not yet + flushed the commit durably to disk. + + Instead put it in a queue - and periodically, we will request a checkpoint + from all engines and delete a whole batch at once. + */ + mysql_mutex_lock(&LOCK_pending_checkpoint); + if (pending_checkpoint == NULL) + { + uint32 size= sizeof(*pending_checkpoint); + if (!(pending_checkpoint= + (pending_cookies *)my_malloc(size, MYF(MY_ZEROFILL)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), size); + mysql_mutex_unlock(&LOCK_pending_checkpoint); + return 1; + } + } + + pending_checkpoint->cookies[pending_checkpoint->count++]= cookie; + if (pending_checkpoint->count == sizeof(pending_checkpoint->cookies) / + sizeof(pending_checkpoint->cookies[0])) + { + full_buffer= pending_checkpoint; + pending_checkpoint= NULL; + } + mysql_mutex_unlock(&LOCK_pending_checkpoint); + + if (full_buffer) + { + /* + We do an extra increment and notify here - this ensures that + things work also if there are no engines at all that support + commit_checkpoint_request. + */ + ++full_buffer->pending_count; + ha_commit_checkpoint_request(full_buffer, mmap_do_checkpoint_callback); + commit_checkpoint_notify(full_buffer); + } + return 0; +} + + +void +TC_LOG_MMAP::commit_checkpoint_notify(void *cookie) +{ + uint count; + pending_cookies *pending= static_cast<pending_cookies *>(cookie); + mysql_mutex_lock(&LOCK_pending_checkpoint); + DBUG_ASSERT(pending->pending_count > 0); + count= --pending->pending_count; + mysql_mutex_unlock(&LOCK_pending_checkpoint); + if (count == 0) + { + uint i; + for (i= 0; i < sizeof(pending->cookies)/sizeof(pending->cookies[0]); ++i) + delete_entry(pending->cookies[i]); + my_free(pending); + } +} + + /** erase xid from the page, update page free space counters/pointers. cookie points directly to the memory where xid was logged. */ -int TC_LOG_MMAP::unlog(ulong cookie, my_xid xid) +int TC_LOG_MMAP::delete_entry(ulong cookie) { PAGE *p=pages+(cookie/tc_log_page_size); my_xid *x=(my_xid *)(data+cookie); - DBUG_ASSERT(*x == xid); DBUG_ASSERT(x >= p->start && x < p->end); mysql_mutex_lock(&p->lock); @@ -7258,6 +8825,7 @@ void TC_LOG_MMAP::close() mysql_mutex_destroy(&LOCK_sync); mysql_mutex_destroy(&LOCK_active); mysql_mutex_destroy(&LOCK_pool); + mysql_mutex_destroy(&LOCK_pending_checkpoint); mysql_cond_destroy(&COND_pool); mysql_cond_destroy(&COND_active); mysql_cond_destroy(&COND_queue_busy); @@ -7285,9 +8853,12 @@ void TC_LOG_MMAP::close() } if (inited>=5) // cannot do in the switch because of Windows mysql_file_delete(key_file_tclog, logname, MYF(MY_WME)); + if (pending_checkpoint) + my_free(pending_checkpoint); inited=0; } + int TC_LOG_MMAP::recover() { HASH xids; @@ -7373,26 +8944,13 @@ int TC_LOG::using_heuristic_recover() /****** transaction coordinator log for 2pc - binlog() based solution ******/ #define TC_LOG_BINLOG MYSQL_BIN_LOG -/** - @todo - keep in-memory list of prepared transactions - (add to list in log(), remove on unlog()) - and copy it to the new binlog if rotated - but let's check the behaviour of tc_log_page_waits first! -*/ - int TC_LOG_BINLOG::open(const char *opt_name) { - LOG_INFO log_info; int error= 1; DBUG_ASSERT(total_ha_2pc > 1); DBUG_ASSERT(opt_name && opt_name[0]); - mysql_mutex_init(key_BINLOG_LOCK_prep_xids, - &LOCK_prep_xids, MY_MUTEX_INIT_FAST); - mysql_cond_init(key_BINLOG_COND_prep_xids, &COND_prep_xids, 0); - if (!my_b_inited(&index_file)) { /* There was a failure to open the index file, can't open the binlog */ @@ -7402,78 +8960,22 @@ int TC_LOG_BINLOG::open(const char *opt_name) if (using_heuristic_recover()) { + mysql_mutex_lock(&LOCK_log); /* generate a new binlog to mask a corrupted one */ - open(opt_name, LOG_BIN, 0, WRITE_CACHE, 0, max_binlog_size, 0, TRUE); + open(opt_name, LOG_BIN, 0, WRITE_CACHE, max_binlog_size, 0, TRUE); + mysql_mutex_unlock(&LOCK_log); cleanup(); return 1; } - if ((error= find_log_pos(&log_info, NullS, 1))) - { - if (error != LOG_INFO_EOF) - sql_print_error("find_log_pos() failed (error: %d)", error); - else - error= 0; - goto err; - } - - { - const char *errmsg; - IO_CACHE log; - File file; - Log_event *ev=0; - Format_description_log_event fdle(BINLOG_VERSION); - char log_name[FN_REFLEN]; - - if (! fdle.is_valid()) - goto err; - - do - { - strmake_buf(log_name, log_info.log_file_name); - } while (!(error= find_next_log(&log_info, 1))); - - if (error != LOG_INFO_EOF) - { - sql_print_error("find_log_pos() failed (error: %d)", error); - goto err; - } - - if ((file= open_binlog(&log, log_name, &errmsg)) < 0) - { - sql_print_error("%s", errmsg); - goto err; - } - - if ((ev= Log_event::read_log_event(&log, 0, &fdle, - opt_master_verify_checksum)) && - ev->get_type_code() == FORMAT_DESCRIPTION_EVENT && - ev->flags & LOG_EVENT_BINLOG_IN_USE_F) - { - sql_print_information("Recovering after a crash using %s", opt_name); - error= recover(&log, (Format_description_log_event *)ev); - } - else - error=0; - - delete ev; - end_io_cache(&log); - mysql_file_close(file, MYF(MY_WME)); - - if (error) - goto err; - } - -err: + error= do_binlog_recovery(opt_name, true); + binlog_state_recover_done= true; return error; } /** This is called on shutdown, after ha_panic. */ void TC_LOG_BINLOG::close() { - DBUG_ASSERT(prepared_xids==0); - mysql_mutex_destroy(&LOCK_prep_xids); - mysql_cond_destroy(&COND_prep_xids); } /* @@ -7498,7 +9000,17 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all, DEBUG_SYNC(thd, "binlog_after_log_and_order"); - DBUG_RETURN(!err); + if (err) + DBUG_RETURN(0); + /* + If using explicit user XA, we will not have XID. We must still return a + non-zero cookie (as zero cookie signals error). + */ + if (!xid || !cache_mngr->need_unlog) + DBUG_RETURN(BINLOG_COOKIE_DUMMY(cache_mngr->delayed_error)); + else + DBUG_RETURN(BINLOG_COOKIE_MAKE(cache_mngr->binlog_id, + cache_mngr->delayed_error)); } /* @@ -7513,92 +9025,542 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all, binary log. */ void -TC_LOG_BINLOG::mark_xids_active(uint xid_count) +TC_LOG_BINLOG::mark_xids_active(ulong binlog_id, uint xid_count) { + xid_count_per_binlog *b; + DBUG_ENTER("TC_LOG_BINLOG::mark_xids_active"); - DBUG_PRINT("info", ("xid_count=%u", xid_count)); - mysql_mutex_lock(&LOCK_prep_xids); - prepared_xids+= xid_count; - mysql_mutex_unlock(&LOCK_prep_xids); + DBUG_PRINT("info", ("binlog_id=%lu xid_count=%u", binlog_id, xid_count)); + + mysql_mutex_lock(&LOCK_xid_list); + I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list); + while ((b= it++)) + { + if (b->binlog_id == binlog_id) + { + b->xid_count += xid_count; + break; + } + } + /* + As we do not delete elements until count reach zero, elements should always + be found. + */ + DBUG_ASSERT(b); + mysql_mutex_unlock(&LOCK_xid_list); DBUG_VOID_RETURN; } /* - Once an XID is committed, it is safe to rotate the binary log, as it can no - longer be needed during crash recovery. + Once an XID is committed, it can no longer be needed during crash recovery, + as it has been durably recorded on disk as "committed". This function is called to mark an XID this way. It needs to decrease the - count of pending XIDs, and signal the log rotator thread when it reaches zero. + count of pending XIDs in the corresponding binlog. When the count reaches + zero (for an "old" binlog that is not the active one), that binlog file no + longer need to be scanned during crash recovery, so we can log a new binlog + checkpoint. */ void -TC_LOG_BINLOG::mark_xid_done() +TC_LOG_BINLOG::mark_xid_done(ulong binlog_id, bool write_checkpoint) { - my_bool send_signal; + xid_count_per_binlog *b; + bool first; + ulong current; DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done"); - mysql_mutex_lock(&LOCK_prep_xids); - // prepared_xids can be 0 if the transaction had ignorable errors. - DBUG_ASSERT(prepared_xids >= 0); - if (prepared_xids > 0) - prepared_xids--; - send_signal= (prepared_xids == 0); - mysql_mutex_unlock(&LOCK_prep_xids); - if (send_signal) { - DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids)); - mysql_cond_signal(&COND_prep_xids); + + mysql_mutex_lock(&LOCK_xid_list); + current= current_binlog_id; + I_List_iterator<xid_count_per_binlog> it(binlog_xid_count_list); + first= true; + while ((b= it++)) + { + if (b->binlog_id == binlog_id) + { + --b->xid_count; + break; + } + first= false; + } + /* Binlog is always found, as we do not remove until count reaches 0 */ + DBUG_ASSERT(b); + /* + If a RESET MASTER is pending, we are about to remove all log files, and + the RESET MASTER thread is waiting for all pending unlog() calls to + complete while holding LOCK_log. In this case we should not log a binlog + checkpoint event (it would be deleted immediately anyway and we would + deadlock on LOCK_log) but just signal the thread. + */ + if (unlikely(reset_master_pending)) + { + mysql_cond_signal(&COND_xid_list); + mysql_mutex_unlock(&LOCK_xid_list); + DBUG_VOID_RETURN; + } + + if (likely(binlog_id == current) || b->xid_count != 0 || !first || + !write_checkpoint) + { + /* No new binlog checkpoint reached yet. */ + mysql_mutex_unlock(&LOCK_xid_list); + DBUG_VOID_RETURN; } + + /* + Now log a binlog checkpoint for the first binlog file with a non-zero count. + + Note that it is possible (though perhaps unlikely) that when count of + binlog (N-2) drops to zero, binlog (N-1) is already at zero. So we may + need to skip several entries before we find the one to log in the binlog + checkpoint event. + + We chain the locking of LOCK_xid_list and LOCK_log, so that we ensure that + Binlog_checkpoint_events are logged in order. This simplifies recovery a + bit, as it can just take the last binlog checkpoint in the log, rather + than compare all found against each other to find the one pointing to the + most recent binlog. + + Note also that we need to first release LOCK_xid_list, then aquire + LOCK_log, then re-aquire LOCK_xid_list. If we were to take LOCK_log while + holding LOCK_xid_list, we might deadlock with other threads that take the + locks in the opposite order. + */ + + ++mark_xid_done_waiting; + mysql_mutex_unlock(&LOCK_xid_list); + mysql_mutex_lock(&LOCK_log); + mysql_mutex_lock(&LOCK_xid_list); + --mark_xid_done_waiting; + if (unlikely(reset_master_pending)) + mysql_cond_signal(&COND_xid_list); + /* We need to reload current_binlog_id due to release/re-take of lock. */ + current= current_binlog_id; + + for (;;) + { + /* Remove initial element(s) with zero count. */ + b= binlog_xid_count_list.head(); + /* + We must not remove all elements in the list - the entry for the current + binlog must be present always. + */ + DBUG_ASSERT(b); + if (b->binlog_id == current || b->xid_count > 0) + break; + my_free(binlog_xid_count_list.get()); + } + + mysql_mutex_unlock(&LOCK_xid_list); + write_binlog_checkpoint_event_already_locked(b->binlog_name, + b->binlog_name_len); + mysql_mutex_unlock(&LOCK_log); DBUG_VOID_RETURN; } int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) { DBUG_ENTER("TC_LOG_BINLOG::unlog"); - if (xid) - mark_xid_done(); - /* As ::write_transaction_to_binlog() did not rotate, do it here. */ - DBUG_RETURN(rotate_and_purge(0)); + if (!xid) + DBUG_RETURN(0); + + if (!BINLOG_COOKIE_IS_DUMMY(cookie)) + mark_xid_done(BINLOG_COOKIE_GET_ID(cookie), true); + /* + See comment in trx_group_commit_leader() - if rotate() gave a failure, + we delay the return of error code to here. + */ + DBUG_RETURN(BINLOG_COOKIE_GET_ERROR_FLAG(cookie)); +} + +void +TC_LOG_BINLOG::commit_checkpoint_notify(void *cookie) +{ + xid_count_per_binlog *entry= static_cast<xid_count_per_binlog *>(cookie); + mysql_mutex_lock(&LOCK_binlog_background_thread); + entry->next_in_queue= binlog_background_thread_queue; + binlog_background_thread_queue= entry; + mysql_cond_signal(&COND_binlog_background_thread); + mysql_mutex_unlock(&LOCK_binlog_background_thread); +} + +/* + Binlog background thread. + + This thread is used to log binlog checkpoints in the background, rather than + in the context of random storage engine threads that happen to call + commit_checkpoint_notify_ha() and may not like the delays while syncing + binlog to disk or may not be setup with all my_thread_init() and other + necessary stuff. + + In the future, this thread could also be used to do log rotation in the + background, which could elimiate all stalls around binlog rotations. +*/ +pthread_handler_t +binlog_background_thread(void *arg __attribute__((unused))) +{ + bool stop; + MYSQL_BIN_LOG::xid_count_per_binlog *queue, *next; + THD *thd; + my_thread_init(); + DBUG_ENTER("binlog_background_thread"); + + thd= new THD; + thd->system_thread= SYSTEM_THREAD_BINLOG_BACKGROUND; + thd->thread_stack= (char*) &thd; /* Set approximate stack start */ + mysql_mutex_lock(&LOCK_thread_count); + thd->thread_id= thread_id++; + mysql_mutex_unlock(&LOCK_thread_count); + thd->store_globals(); + thd->security_ctx->skip_grants(); + thd->set_command(COM_DAEMON); + + /* + Load the slave replication GTID state from the mysql.gtid_slave_pos + table. + + This is mostly so that we can start our seq_no counter from the highest + seq_no seen by a slave. This way, we have a way to tell if a transaction + logged by ourselves as master is newer or older than a replicated + transaction. + */ +#ifdef HAVE_REPLICATION + if (rpl_load_gtid_slave_state(thd)) + sql_print_warning("Failed to load slave replication state from table " + "%s.%s: %u: %s", "mysql", + rpl_gtid_slave_state_table_name.str, + thd->get_stmt_da()->sql_errno(), + thd->get_stmt_da()->message()); +#endif + + mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread); + binlog_background_thread_started= true; + mysql_cond_signal(&mysql_bin_log.COND_binlog_background_thread_end); + mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread); + + for (;;) + { + /* + Wait until there is something in the queue to process, or we are asked + to shut down. + */ + THD_STAGE_INFO(thd, stage_binlog_waiting_background_tasks); + mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread); + for (;;) + { + stop= binlog_background_thread_stop; + queue= binlog_background_thread_queue; + if (stop && !mysql_bin_log.is_xidlist_idle()) + { + /* + Delay stop until all pending binlog checkpoints have been processed. + */ + stop= false; + } + if (stop || queue) + break; + mysql_cond_wait(&mysql_bin_log.COND_binlog_background_thread, + &mysql_bin_log.LOCK_binlog_background_thread); + } + /* Grab the queue, if any. */ + binlog_background_thread_queue= NULL; + mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread); + + /* Process any incoming commit_checkpoint_notify() calls. */ + DBUG_EXECUTE_IF("inject_binlog_background_thread_before_mark_xid_done", + DBUG_ASSERT(!debug_sync_set_action( + thd, + STRING_WITH_LEN("binlog_background_thread_before_mark_xid_done " + "SIGNAL injected_binlog_background_thread " + "WAIT_FOR something_that_will_never_happen " + "TIMEOUT 2"))); + ); + while (queue) + { + THD_STAGE_INFO(thd, stage_binlog_processing_checkpoint_notify); + DEBUG_SYNC(thd, "binlog_background_thread_before_mark_xid_done"); + /* Set the thread start time */ + thd->set_time(); + /* Grab next pointer first, as mark_xid_done() may free the element. */ + next= queue->next_in_queue; + mysql_bin_log.mark_xid_done(queue->binlog_id, true); + queue= next; + + DBUG_EXECUTE_IF("binlog_background_checkpoint_processed", + DBUG_ASSERT(!debug_sync_set_action( + thd, + STRING_WITH_LEN("now SIGNAL binlog_background_checkpoint_processed"))); + ); + } + + if (stop) + break; + } + + THD_STAGE_INFO(thd, stage_binlog_stopping_background_thread); + + mysql_mutex_lock(&LOCK_thread_count); + delete thd; + mysql_mutex_unlock(&LOCK_thread_count); + + my_thread_end(); + + /* Signal that we are (almost) stopped. */ + mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread); + binlog_background_thread_stop= false; + mysql_cond_signal(&mysql_bin_log.COND_binlog_background_thread_end); + mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread); + + DBUG_RETURN(0); } -int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle) +#ifdef HAVE_PSI_INTERFACE +static PSI_thread_key key_thread_binlog; + +static PSI_thread_info all_binlog_threads[]= +{ + { &key_thread_binlog, "binlog_background", PSI_FLAG_GLOBAL}, +}; +#endif /* HAVE_PSI_INTERFACE */ + +static bool +start_binlog_background_thread() { - Log_event *ev; + pthread_t th; + +#ifdef HAVE_PSI_INTERFACE + if (PSI_server) + PSI_server->register_thread("sql", all_binlog_threads, + array_elements(all_binlog_threads)); +#endif + + if (mysql_thread_create(key_thread_binlog, &th, &connection_attrib, + binlog_background_thread, NULL)) + return 1; + + /* + Wait for the thread to have started (so we know that the slave replication + state is loaded and we have correct global_gtid_counter). + */ + mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread); + while (!binlog_background_thread_started) + mysql_cond_wait(&mysql_bin_log.COND_binlog_background_thread_end, + &mysql_bin_log.LOCK_binlog_background_thread); + mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread); + + return 0; +} + + +int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, + IO_CACHE *first_log, + Format_description_log_event *fdle, bool do_xa) +{ + Log_event *ev= NULL; HASH xids; MEM_ROOT mem_root; + char binlog_checkpoint_name[FN_REFLEN]; + bool binlog_checkpoint_found; + bool first_round; + IO_CACHE log; + File file= -1; + const char *errmsg; +#ifdef HAVE_REPLICATION + rpl_gtid last_gtid; + bool last_gtid_standalone= false; + bool last_gtid_valid= false; +#endif if (! fdle->is_valid() || - my_hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0, - sizeof(my_xid), 0, 0, MYF(0))) + (do_xa && my_hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0, + sizeof(my_xid), 0, 0, MYF(0)))) goto err1; - init_alloc_root(&mem_root, TC_LOG_PAGE_SIZE, TC_LOG_PAGE_SIZE); + if (do_xa) + init_alloc_root(&mem_root, TC_LOG_PAGE_SIZE, TC_LOG_PAGE_SIZE, MYF(0)); fdle->flags&= ~LOG_EVENT_BINLOG_IN_USE_F; // abort on the first error - while ((ev= Log_event::read_log_event(log, 0, fdle, - opt_master_verify_checksum)) - && ev->is_valid()) + /* + Scan the binlog for XIDs that need to be committed if still in the + prepared stage. + + Start with the latest binlog file, then continue with any other binlog + files if the last found binlog checkpoint indicates it is needed. + */ + + binlog_checkpoint_found= false; + first_round= true; + for (;;) { - if (ev->get_type_code() == XID_EVENT) + while ((ev= Log_event::read_log_event(first_round ? first_log : &log, + 0, fdle, opt_master_verify_checksum)) + && ev->is_valid()) + { + enum Log_event_type typ= ev->get_type_code(); + switch (typ) + { + case XID_EVENT: + { + if (do_xa) + { + Xid_log_event *xev=(Xid_log_event *)ev; + uchar *x= (uchar *) memdup_root(&mem_root, (uchar*) &xev->xid, + sizeof(xev->xid)); + if (!x || my_hash_insert(&xids, x)) + goto err2; + } + break; + } + case BINLOG_CHECKPOINT_EVENT: + if (first_round && do_xa) + { + uint dir_len; + Binlog_checkpoint_log_event *cev= (Binlog_checkpoint_log_event *)ev; + if (cev->binlog_file_len >= FN_REFLEN) + sql_print_warning("Incorrect binlog checkpoint event with too " + "long file name found."); + else + { + /* + Note that we cannot use make_log_name() here, as we have not yet + initialised MYSQL_BIN_LOG::log_file_name. + */ + dir_len= dirname_length(last_log_name); + strmake(strnmov(binlog_checkpoint_name, last_log_name, dir_len), + cev->binlog_file_name, FN_REFLEN - 1 - dir_len); + binlog_checkpoint_found= true; + } + } + break; + case GTID_LIST_EVENT: + if (first_round) + { + Gtid_list_log_event *glev= (Gtid_list_log_event *)ev; + + /* Initialise the binlog state from the Gtid_list event. */ + if (rpl_global_gtid_binlog_state.load(glev->list, glev->count)) + goto err2; + } + break; + +#ifdef HAVE_REPLICATION + case GTID_EVENT: + if (first_round) + { + Gtid_log_event *gev= (Gtid_log_event *)ev; + + /* Update the binlog state with any GTID logged after Gtid_list. */ + last_gtid.domain_id= gev->domain_id; + last_gtid.server_id= gev->server_id; + last_gtid.seq_no= gev->seq_no; + last_gtid_standalone= + ((gev->flags2 & Gtid_log_event::FL_STANDALONE) ? true : false); + last_gtid_valid= true; + } + break; +#endif + + default: + /* Nothing. */ + break; + } + +#ifdef HAVE_REPLICATION + if (last_gtid_valid && + ((last_gtid_standalone && !ev->is_part_of_group(typ)) || + (!last_gtid_standalone && + (typ == XID_EVENT || + (typ == QUERY_EVENT && + (((Query_log_event *)ev)->is_commit() || + ((Query_log_event *)ev)->is_rollback())))))) + { + if (rpl_global_gtid_binlog_state.update_nolock(&last_gtid, false)) + goto err2; + last_gtid_valid= false; + } +#endif + + delete ev; + ev= NULL; + } + + if (!do_xa) + break; + /* + If the last binlog checkpoint event points to an older log, we have to + scan all logs from there also, to get all possible XIDs to recover. + + If there was no binlog checkpoint event at all, this means the log was + written by an older version of MariaDB (or MySQL) - these always have an + (implicit) binlog checkpoint event at the start of the last binlog file. + */ + if (first_round) { - Xid_log_event *xev=(Xid_log_event *)ev; - uchar *x= (uchar *) memdup_root(&mem_root, (uchar*) &xev->xid, - sizeof(xev->xid)); - if (!x || my_hash_insert(&xids, x)) + if (!binlog_checkpoint_found) + break; + first_round= false; + DBUG_EXECUTE_IF("xa_recover_expect_master_bin_000004", + if (0 != strcmp("./master-bin.000004", binlog_checkpoint_name) && + 0 != strcmp(".\\master-bin.000004", binlog_checkpoint_name)) + DBUG_SUICIDE(); + ); + if (find_log_pos(linfo, binlog_checkpoint_name, 1)) + { + sql_print_error("Binlog file '%s' not found in binlog index, needed " + "for recovery. Aborting.", binlog_checkpoint_name); goto err2; + } + } + else + { + end_io_cache(&log); + mysql_file_close(file, MYF(MY_WME)); + file= -1; + } + + if (!strcmp(linfo->log_file_name, last_log_name)) + break; // No more files to do + if ((file= open_binlog(&log, linfo->log_file_name, &errmsg)) < 0) + { + sql_print_error("%s", errmsg); + goto err2; + } + /* + We do not need to read the Format_description_log_event of other binlog + files. It is not possible for a binlog checkpoint to span multiple + binlog files written by different versions of the server. So we can use + the first one read for reading from all binlog files. + */ + if (find_next_log(linfo, 1)) + { + sql_print_error("Error reading binlog files during recovery. Aborting."); + goto err2; } - delete ev; } - if (ha_recover(&xids)) - goto err2; + if (do_xa) + { + if (ha_recover(&xids)) + goto err2; - free_root(&mem_root, MYF(0)); - my_hash_free(&xids); + free_root(&mem_root, MYF(0)); + my_hash_free(&xids); + } return 0; err2: - free_root(&mem_root, MYF(0)); - my_hash_free(&xids); + delete ev; + if (file >= 0) + { + end_io_cache(&log); + mysql_file_close(file, MYF(MY_WME)); + } + if (do_xa) + { + free_root(&mem_root, MYF(0)); + my_hash_free(&xids); + } err1: sql_print_error("Crash recovery failed. Either correct the problem " "(if it's, for example, out of memory error) and restart, " @@ -7608,6 +9570,110 @@ err1: } +int +MYSQL_BIN_LOG::do_binlog_recovery(const char *opt_name, bool do_xa_recovery) +{ + LOG_INFO log_info; + const char *errmsg; + IO_CACHE log; + File file; + Log_event *ev= 0; + Format_description_log_event fdle(BINLOG_VERSION); + char log_name[FN_REFLEN]; + int error; + + if ((error= find_log_pos(&log_info, NullS, 1))) + { + /* + If there are no binlog files (LOG_INFO_EOF), then we still try to read + the .state file to restore the binlog state. This allows to copy a server + to provision a new one without copying the binlog files (except the + master-bin.state file) and still preserve the correct binlog state. + */ + if (error != LOG_INFO_EOF) + sql_print_error("find_log_pos() failed (error: %d)", error); + else + { + error= read_state_from_file(); + if (error == 2) + { + /* + No binlog files and no binlog state is not an error (eg. just initial + server start after fresh installation). + */ + error= 0; + } + } + return error; + } + + if (! fdle.is_valid()) + return 1; + + do + { + strmake_buf(log_name, log_info.log_file_name); + } while (!(error= find_next_log(&log_info, 1))); + + if (error != LOG_INFO_EOF) + { + sql_print_error("find_log_pos() failed (error: %d)", error); + return error; + } + + if ((file= open_binlog(&log, log_name, &errmsg)) < 0) + { + sql_print_error("%s", errmsg); + return 1; + } + + if ((ev= Log_event::read_log_event(&log, 0, &fdle, + opt_master_verify_checksum)) && + ev->get_type_code() == FORMAT_DESCRIPTION_EVENT) + { + if (ev->flags & LOG_EVENT_BINLOG_IN_USE_F) + { + sql_print_information("Recovering after a crash using %s", opt_name); + error= recover(&log_info, log_name, &log, + (Format_description_log_event *)ev, do_xa_recovery); + } + else + { + error= read_state_from_file(); + if (error == 2) + { + /* + The binlog exists, but the .state file is missing. This is normal if + this is the first master start after a major upgrade to 10.0 (with + GTID support). + + However, it could also be that the .state file was lost somehow, and + in this case it could be a serious issue, as we would set the wrong + binlog state in the next binlog file to be created, and GTID + processing would be corrupted. A common way would be copying files + from an old server to a new one and forgetting the .state file. + + So in this case, we want to try to recover the binlog state by + scanning the last binlog file (but we do not need any XA recovery). + + ToDo: We could avoid one scan at first start after major upgrade, by + detecting that there is no GTID_LIST event at the start of the + binlog file, and stopping the scan in that case. + */ + error= recover(&log_info, log_name, &log, + (Format_description_log_event *)ev, false); + } + } + } + + delete ev; + end_io_cache(&log); + mysql_file_close(file, MYF(MY_WME)); + + return error; +} + + #ifdef INNODB_COMPATIBILITY_HOOKS /** Get the file name of the MySQL binlog. @@ -7662,10 +9728,13 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var, { ulong value= *((ulong *)save); bool check_purge= false; + ulong prev_binlog_id; + LINT_INIT(prev_binlog_id); mysql_mutex_lock(mysql_bin_log.get_log_lock()); if(mysql_bin_log.is_open()) { + prev_binlog_id= mysql_bin_log.current_binlog_id; if (binlog_checksum_options != value) mysql_bin_log.checksum_alg_reset= (uint8) value; if (mysql_bin_log.rotate(true, &check_purge)) @@ -7679,7 +9748,7 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var, mysql_bin_log.checksum_alg_reset= BINLOG_CHECKSUM_ALG_UNDEF; mysql_mutex_unlock(mysql_bin_log.get_log_lock()); if (check_purge) - mysql_bin_log.purge(); + mysql_bin_log.checkpoint_and_purge(prev_binlog_id); } @@ -7745,7 +9814,7 @@ set_binlog_snapshot_file(const char *src) Copy out current values of status variables, for SHOW STATUS or information_schema.global_status. - This is called only under LOCK_status, so we can fill in a static array. + This is called only under LOCK_show_status, so we can fill in a static array. */ void TC_LOG_BINLOG::set_status_variables(THD *thd) @@ -7767,6 +9836,11 @@ TC_LOG_BINLOG::set_status_variables(THD *thd) binlog_snapshot_position= last_commit_pos_offset; } mysql_mutex_unlock(&LOCK_commit_ordered); + mysql_mutex_lock(&LOCK_prepare_ordered); + binlog_status_group_commit_trigger_count= this->group_commit_trigger_count; + binlog_status_group_commit_trigger_timeout= this->group_commit_trigger_timeout; + binlog_status_group_commit_trigger_lock_wait= this->group_commit_trigger_lock_wait; + mysql_mutex_unlock(&LOCK_prepare_ordered); if (have_snapshot) { |