diff options
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 385 |
1 files changed, 362 insertions, 23 deletions
diff --git a/sql/log.cc b/sql/log.cc index 088816d7954..48c458c2607 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -120,6 +120,8 @@ static MYSQL_BIN_LOG::xid_count_per_binlog * static bool start_binlog_background_thread(); +static rpl_binlog_state rpl_global_gtid_binlog_state; + /** purge logs, master and slave sides both, related error code convertor. @@ -686,7 +688,8 @@ bool Log_to_csv_event_handler:: /* do a write */ if (table->field[1]->store(user_host, user_host_len, client_cs) || table->field[2]->store((longlong) thread_id, TRUE) || - table->field[3]->store((longlong) server_id, TRUE) || + table->field[3]->store((longlong) global_system_variables.server_id, + TRUE) || table->field[4]->store(command_type, command_type_len, client_cs)) goto err; @@ -883,7 +886,7 @@ bool Log_to_csv_event_handler:: table->field[8]->set_notnull(); } - if (table->field[9]->store((longlong) server_id, TRUE)) + if (table->field[9]->store((longlong)global_system_variables.server_id, TRUE)) goto err; table->field[9]->set_notnull(); @@ -2938,7 +2941,7 @@ MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period) bytes_written(0), file_id(1), open_count(1), group_commit_queue(0), group_commit_queue_busy(FALSE), num_commits(0), num_group_commits(0), - sync_period_ptr(sync_period), sync_counter(0), + sync_period_ptr(sync_period), sync_counter(0), state_read(false), is_relay_log(0), signal_cnt(0), checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF), relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), @@ -3132,6 +3135,9 @@ bool MYSQL_BIN_LOG::open(const char *log_name, DBUG_ENTER("MYSQL_BIN_LOG::open"); DBUG_PRINT("enter",("log_type: %d",(int) log_type_arg)); + if (!is_relay_log && read_state_from_file()) + DBUG_RETURN(1); + if (!is_relay_log && !binlog_background_thread_started && start_binlog_background_thread()) DBUG_RETURN(1); @@ -3245,6 +3251,47 @@ bool MYSQL_BIN_LOG::open(const char *log_name, if (!is_relay_log) { char buf[FN_REFLEN]; + + /* + Output a Gtid_list_log_event at the start of the binlog file. + + This is used to quickly determine which GTIDs are found in binlog + files earlier than this one, and which are found in this (or later) + binlogs. + + The list gives a mapping from (domain_id, server_id) -> seq_no (so + this means that there is at most one entry for every unique pair + (domain_id, server_id) in the list). It indicates that this seq_no is + the last one found in an earlier binlog file for this (domain_id, + server_id) combination - so any higher seq_no should be search for + from this binlog file, or a later one. + + This allows to locate the binlog file containing a given GTID by + scanning backwards, reading just the Gtid_list_log_event at the + start of each file, and scanning only the relevant binlog file when + found, not all binlog files. + + The existence of a given entry (domain_id, server_id, seq_no) + guarantees only that this seq_no will not be found in this or any + later binlog file. It does not guarantee that it can be found it an + earlier binlog file, for example the file may have been purged. + + If there is no entry for a given (domain_id, server_id) pair, then + it means that no such GTID exists in any earlier binlog. It is + permissible to remove such pair from future Gtid_list_log_events + if all previous binlog files containing such GTIDs have been purged + (though such optimization is not performed at the time of this + writing). So if there is no entry for given GTID it means that such + GTID should be search for in this or later binlog file, same as if + there had been an entry (domain_id, server_id, 0). + */ + + Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state); + if (gl_ev.write(&log_file)) + goto err; + + /* Output a binlog checkpoint event at the start of the binlog file. */ + /* Construct an entry in the binlog_xid_count_list for the new binlog file (we will not link it into the list until we know the new file @@ -3671,7 +3718,8 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log) const char* save_name; DBUG_ENTER("reset_logs"); - ha_reset_logs(thd); + if (thd) + ha_reset_logs(thd); /* We need to get both locks to be sure that no one is trying to write to the index log file. @@ -3796,6 +3844,14 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log) break; } + if (!is_relay_log) + { + rpl_global_gtid_binlog_state.reset(); + mysql_mutex_lock(&LOCK_gtid_counter); + global_gtid_counter= 0; + mysql_mutex_unlock(&LOCK_gtid_counter); + } + /* Start logging with a new file */ close(LOG_CLOSE_INDEX | LOG_CLOSE_TO_BE_OPENED); if ((error= my_delete_allow_opened(index_file_name, MYF(0)))) // Reset (open will update) @@ -5310,6 +5366,213 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, DBUG_RETURN(error); } + +/* Generate a new global transaction ID, and write it to the binlog */ +bool +MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, + bool is_transactional) +{ + rpl_gtid gtid; + uint64 seq_no; + + seq_no= thd->variables.gtid_seq_no; + /* + Reset the session variable gtid_seq_no, to reduce the risk of accidentally + producing a duplicate GTID. + */ + thd->variables.gtid_seq_no= 0; + if (seq_no != 0) + { + /* + If we see a higher sequence number, use that one as the basis of any + later generated sequence numbers. + */ + bump_seq_no_counter_if_needed(seq_no); + } + else + { + mysql_mutex_lock(&LOCK_gtid_counter); + seq_no= ++global_gtid_counter; + mysql_mutex_unlock(&LOCK_gtid_counter); + } + gtid.seq_no= seq_no; + gtid.domain_id= thd->variables.gtid_domain_id; + + Gtid_log_event gtid_event(thd, gtid.seq_no, gtid.domain_id, standalone, + LOG_EVENT_SUPPRESS_USE_F, is_transactional); + gtid.server_id= gtid_event.server_id; + + /* Write the event to the binary log. */ + if (gtid_event.write(&mysql_bin_log.log_file)) + return true; + status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written); + + /* Update the replication state (last GTID in each replication domain). */ + mysql_mutex_lock(&LOCK_rpl_gtid_state); + rpl_global_gtid_binlog_state.update(>id); + mysql_mutex_unlock(&LOCK_rpl_gtid_state); + return false; +} + + +int +MYSQL_BIN_LOG::write_state_to_file() +{ + File file_no; + IO_CACHE cache; + char buf[FN_REFLEN]; + int err; + bool opened= false; + bool inited= false; + + fn_format(buf, opt_bin_logname, mysql_data_home, ".state", + MY_UNPACK_FILENAME); + if ((file_no= mysql_file_open(key_file_binlog_state, buf, + O_RDWR|O_CREAT|O_TRUNC|O_BINARY, + MYF(MY_WME))) < 0) + { + err= 1; + goto err; + } + opened= true; + if ((err= init_io_cache(&cache, file_no, IO_SIZE, WRITE_CACHE, 0, 0, + MYF(MY_WME|MY_WAIT_IF_FULL)))) + goto err; + inited= true; + if ((err= rpl_global_gtid_binlog_state.write_to_iocache(&cache))) + goto err; + inited= false; + if ((err= end_io_cache(&cache))) + goto err; + if ((err= mysql_file_sync(file_no, MYF(MY_WME|MY_SYNC_FILESIZE)))) + goto err; + goto end; + +err: + sql_print_error("Error writing binlog state to file '%s'.\n", buf); + if (inited) + end_io_cache(&cache); +end: + if (opened) + mysql_file_close(file_no, MYF(0)); + + return err; +} + + +int +MYSQL_BIN_LOG::read_state_from_file() +{ + File file_no; + IO_CACHE cache; + char buf[FN_REFLEN]; + int err; + bool opened= false; + bool inited= false; + + if (state_read) + return 0; + state_read= true; + + fn_format(buf, opt_bin_logname, mysql_data_home, ".state", + MY_UNPACK_FILENAME); + if ((file_no= mysql_file_open(key_file_binlog_state, buf, + O_RDONLY|O_BINARY, MYF(0))) < 0) + { + if (my_errno != ENOENT) + { + err= 1; + goto err; + } + else + { + /* + If the state file does not exist, this is the first server startup + with GTID enabled. So initialize to empty state. + */ + rpl_global_gtid_binlog_state.reset(); + err= 0; + goto end; + } + } + opened= true; + if ((err= init_io_cache(&cache, file_no, IO_SIZE, READ_CACHE, 0, 0, + MYF(MY_WME|MY_WAIT_IF_FULL)))) + goto err; + inited= true; + if ((err= rpl_global_gtid_binlog_state.read_from_iocache(&cache))) + goto err; + goto end; + +err: + sql_print_error("Error reading binlog GTID state from file '%s'.\n", buf); +end: + if (inited) + end_io_cache(&cache); + if (opened) + mysql_file_close(file_no, MYF(0)); + /* Pick the next unused seq_no from the loaded binlog state. */ + bump_seq_no_counter_if_needed( + rpl_global_gtid_binlog_state.seq_no_from_state()); + + return err; +} + + +int +MYSQL_BIN_LOG::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size) +{ + return rpl_global_gtid_binlog_state.get_most_recent_gtid_list(list, size); +} + + +bool +MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id, + rpl_gtid *out_gtid) +{ + rpl_gtid *gtid; + mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + if ((gtid= rpl_global_gtid_binlog_state.find(domain_id, server_id))) + *out_gtid= *gtid; + mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + return gtid != NULL; +} + + +bool +MYSQL_BIN_LOG::lookup_domain_in_binlog_state(uint32 domain_id, + rpl_gtid *out_gtid) +{ + rpl_binlog_state::element *elem; + bool res; + + mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + elem= (rpl_binlog_state::element *) + my_hash_search(&rpl_global_gtid_binlog_state.hash, + (const uchar *)&domain_id, 0); + if (elem) + { + res= true; + *out_gtid= *elem->last_gtid; + } + else + res= false; + mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state); + + return res; +} + + +void +MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint64 seq_no) +{ + mysql_mutex_lock(&LOCK_gtid_counter); + if (global_gtid_counter < seq_no) + global_gtid_counter= seq_no; + mysql_mutex_unlock(&LOCK_gtid_counter); +} + + /** Write an event to the binary log. If with_annotate != NULL and *with_annotate = TRUE write also Annotate_rows before the event @@ -5379,6 +5642,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) my_org_b_tell= my_b_tell(file); mysql_mutex_lock(&LOCK_log); prev_binlog_id= current_binlog_id; + write_gtid_event(thd, true, using_trans); } else { @@ -6251,19 +6515,6 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, break; } - /* - Log "BEGIN" at the beginning of every transaction. Here, a transaction is - either a BEGIN..COMMIT block or a single statement in autocommit mode. - - Create the necessary events here, where we have the correct THD (and - thread context). - - Due to group commit the actual writing to binlog may happen in a different - thread. - */ - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), using_trx_cache, TRUE, - TRUE, 0); - entry.begin_event= &qinfo; entry.end_event= end_ev; if (cache_mngr->stmt_cache.has_incident() || cache_mngr->trx_cache.has_incident()) @@ -6639,10 +6890,8 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry) { binlog_cache_mngr *mngr= entry->cache_mngr; - if (entry->begin_event->write(&log_file)) + if (write_gtid_event(entry->thd, false, entry->using_trx_cache)) return ER_ERROR_ON_WRITE; - status_var_add(entry->thd->status_var.binlog_bytes_written, - entry->begin_event->data_written); if (entry->using_stmt_cache && !mngr->stmt_cache.empty() && write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE))) @@ -6782,6 +7031,8 @@ int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd, void MYSQL_BIN_LOG::close(uint exiting) { // One can't set log_type here! + bool failed_to_save_state= false; + DBUG_ENTER("MYSQL_BIN_LOG::close"); DBUG_PRINT("enter",("exiting: %d", (int) exiting)); if (log_state == LOG_OPENED) @@ -6799,6 +7050,27 @@ void MYSQL_BIN_LOG::close(uint exiting) s.write(&log_file); bytes_written+= s.data_written; signal_update(); + + /* + When we shut down server, write out the binlog state to a separate + file so we do not have to scan an entire binlog file to recover it + at next server start. + + Note that this must be written and synced to disk before marking the + last binlog file as "not crashed". + */ + if (!is_relay_log && write_state_to_file()) + { + sql_print_error("Failed to save binlog GTID state during shutdown. " + "Binlog will be marked as crashed, so that crash " + "recovery can recover the state at next server " + "startup."); + /* + Leave binlog file marked as crashed, so we can recover state by + scanning it now that we failed to write out the state properly. + */ + failed_to_save_state= true; + } } #endif /* HAVE_REPLICATION */ @@ -6807,7 +7079,8 @@ void MYSQL_BIN_LOG::close(uint exiting) && !(exiting & LOG_CLOSE_DELAYED_CLOSE)) { my_off_t org_position= mysql_file_tell(log_file.file, MYF(0)); - clear_inuse_flag_when_closing(log_file.file); + if (!failed_to_save_state) + clear_inuse_flag_when_closing(log_file.file); /* Restore position so that anything we have in the IO_cache is written to the correct position. @@ -7983,9 +8256,13 @@ int TC_LOG_BINLOG::open(const char *opt_name) sql_print_information("Recovering after a crash using %s", opt_name); error= recover(&log_info, log_name, &log, (Format_description_log_event *)ev); + state_read= true; + /* Pick the next unused seq_no from the recovered binlog state. */ + bump_seq_no_counter_if_needed( + rpl_global_gtid_binlog_state.seq_no_from_state()); } else - error=0; + error= read_state_from_file(); delete ev; end_io_cache(&log); @@ -8235,6 +8512,28 @@ binlog_background_thread(void *arg __attribute__((unused))) mysql_mutex_unlock(&LOCK_thread_count); thd->store_globals(); + /* + Load the slave replication GTID state from the mysql.rpl_slave_state + table. + + This is mostly so that we can start our seq_no counter from the highest + seq_no seen by a slave. This way, we have a way to tell if a transaction + logged by ourselves as master is newer or older than a replicated + transaction. + */ +#ifdef HAVE_REPLICATION + if (rpl_load_gtid_slave_state(thd)) + sql_print_warning("Failed to load slave replication state from table " + "%s.%s: %u: %s", "mysql", + rpl_gtid_slave_state_table_name.str, + thd->stmt_da->sql_errno(), thd->stmt_da->message()); +#endif + + mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread); + binlog_background_thread_started= true; + mysql_cond_signal(&mysql_bin_log.COND_binlog_background_thread_end); + mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread); + for (;;) { /* @@ -8333,7 +8632,16 @@ start_binlog_background_thread() binlog_background_thread, NULL)) return 1; - binlog_background_thread_started= true; + /* + Wait for the thread to have started (so we know that the slave replication + state is loaded and we have correct global_gtid_counter). + */ + mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread); + while (!binlog_background_thread_started) + mysql_cond_wait(&mysql_bin_log.COND_binlog_background_thread_end, + &mysql_bin_log.LOCK_binlog_background_thread); + mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread); + return 0; } @@ -8412,6 +8720,37 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, } break; } + case GTID_LIST_EVENT: + if (first_round) + { + uint32 i; + Gtid_list_log_event *glev= (Gtid_list_log_event *)ev; + + /* Initialise the binlog state from the Gtid_list event. */ + rpl_global_gtid_binlog_state.reset(); + for (i= 0; i < glev->count; ++i) + { + if (rpl_global_gtid_binlog_state.update(&(glev->list[i]))) + goto err2; + } + } + break; + + case GTID_EVENT: + if (first_round) + { + Gtid_log_event *gev= (Gtid_log_event *)ev; + rpl_gtid gtid; + + /* Update the binlog state with any GTID logged after Gtid_list. */ + gtid.domain_id= gev->domain_id; + gtid.server_id= gev->server_id; + gtid.seq_no= gev->seq_no; + if (rpl_global_gtid_binlog_state.update(>id)) + goto err2; + } + break; + default: /* Nothing. */ break; |