summaryrefslogtreecommitdiff
path: root/sql/log.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log.cc')
-rw-r--r--sql/log.cc385
1 files changed, 362 insertions, 23 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 088816d7954..48c458c2607 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -120,6 +120,8 @@ static MYSQL_BIN_LOG::xid_count_per_binlog *
static bool start_binlog_background_thread();
+static rpl_binlog_state rpl_global_gtid_binlog_state;
+
/**
purge logs, master and slave sides both, related error code
convertor.
@@ -686,7 +688,8 @@ bool Log_to_csv_event_handler::
/* do a write */
if (table->field[1]->store(user_host, user_host_len, client_cs) ||
table->field[2]->store((longlong) thread_id, TRUE) ||
- table->field[3]->store((longlong) server_id, TRUE) ||
+ table->field[3]->store((longlong) global_system_variables.server_id,
+ TRUE) ||
table->field[4]->store(command_type, command_type_len, client_cs))
goto err;
@@ -883,7 +886,7 @@ bool Log_to_csv_event_handler::
table->field[8]->set_notnull();
}
- if (table->field[9]->store((longlong) server_id, TRUE))
+ if (table->field[9]->store((longlong)global_system_variables.server_id, TRUE))
goto err;
table->field[9]->set_notnull();
@@ -2938,7 +2941,7 @@ MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period)
bytes_written(0), file_id(1), open_count(1),
group_commit_queue(0), group_commit_queue_busy(FALSE),
num_commits(0), num_group_commits(0),
- sync_period_ptr(sync_period), sync_counter(0),
+ sync_period_ptr(sync_period), sync_counter(0), state_read(false),
is_relay_log(0), signal_cnt(0),
checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF),
relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
@@ -3132,6 +3135,9 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
DBUG_ENTER("MYSQL_BIN_LOG::open");
DBUG_PRINT("enter",("log_type: %d",(int) log_type_arg));
+ if (!is_relay_log && read_state_from_file())
+ DBUG_RETURN(1);
+
if (!is_relay_log && !binlog_background_thread_started &&
start_binlog_background_thread())
DBUG_RETURN(1);
@@ -3245,6 +3251,47 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
if (!is_relay_log)
{
char buf[FN_REFLEN];
+
+ /*
+ Output a Gtid_list_log_event at the start of the binlog file.
+
+ This is used to quickly determine which GTIDs are found in binlog
+ files earlier than this one, and which are found in this (or later)
+ binlogs.
+
+ The list gives a mapping from (domain_id, server_id) -> seq_no (so
+ this means that there is at most one entry for every unique pair
+ (domain_id, server_id) in the list). It indicates that this seq_no is
+ the last one found in an earlier binlog file for this (domain_id,
+ server_id) combination - so any higher seq_no should be search for
+ from this binlog file, or a later one.
+
+ This allows to locate the binlog file containing a given GTID by
+ scanning backwards, reading just the Gtid_list_log_event at the
+ start of each file, and scanning only the relevant binlog file when
+ found, not all binlog files.
+
+ The existence of a given entry (domain_id, server_id, seq_no)
+ guarantees only that this seq_no will not be found in this or any
+ later binlog file. It does not guarantee that it can be found it an
+ earlier binlog file, for example the file may have been purged.
+
+ If there is no entry for a given (domain_id, server_id) pair, then
+ it means that no such GTID exists in any earlier binlog. It is
+ permissible to remove such pair from future Gtid_list_log_events
+ if all previous binlog files containing such GTIDs have been purged
+ (though such optimization is not performed at the time of this
+ writing). So if there is no entry for given GTID it means that such
+ GTID should be search for in this or later binlog file, same as if
+ there had been an entry (domain_id, server_id, 0).
+ */
+
+ Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state);
+ if (gl_ev.write(&log_file))
+ goto err;
+
+ /* Output a binlog checkpoint event at the start of the binlog file. */
+
/*
Construct an entry in the binlog_xid_count_list for the new binlog
file (we will not link it into the list until we know the new file
@@ -3671,7 +3718,8 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log)
const char* save_name;
DBUG_ENTER("reset_logs");
- ha_reset_logs(thd);
+ if (thd)
+ ha_reset_logs(thd);
/*
We need to get both locks to be sure that no one is trying to
write to the index log file.
@@ -3796,6 +3844,14 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log)
break;
}
+ if (!is_relay_log)
+ {
+ rpl_global_gtid_binlog_state.reset();
+ mysql_mutex_lock(&LOCK_gtid_counter);
+ global_gtid_counter= 0;
+ mysql_mutex_unlock(&LOCK_gtid_counter);
+ }
+
/* Start logging with a new file */
close(LOG_CLOSE_INDEX | LOG_CLOSE_TO_BE_OPENED);
if ((error= my_delete_allow_opened(index_file_name, MYF(0)))) // Reset (open will update)
@@ -5310,6 +5366,213 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
DBUG_RETURN(error);
}
+
+/* Generate a new global transaction ID, and write it to the binlog */
+bool
+MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
+ bool is_transactional)
+{
+ rpl_gtid gtid;
+ uint64 seq_no;
+
+ seq_no= thd->variables.gtid_seq_no;
+ /*
+ Reset the session variable gtid_seq_no, to reduce the risk of accidentally
+ producing a duplicate GTID.
+ */
+ thd->variables.gtid_seq_no= 0;
+ if (seq_no != 0)
+ {
+ /*
+ If we see a higher sequence number, use that one as the basis of any
+ later generated sequence numbers.
+ */
+ bump_seq_no_counter_if_needed(seq_no);
+ }
+ else
+ {
+ mysql_mutex_lock(&LOCK_gtid_counter);
+ seq_no= ++global_gtid_counter;
+ mysql_mutex_unlock(&LOCK_gtid_counter);
+ }
+ gtid.seq_no= seq_no;
+ gtid.domain_id= thd->variables.gtid_domain_id;
+
+ Gtid_log_event gtid_event(thd, gtid.seq_no, gtid.domain_id, standalone,
+ LOG_EVENT_SUPPRESS_USE_F, is_transactional);
+ gtid.server_id= gtid_event.server_id;
+
+ /* Write the event to the binary log. */
+ if (gtid_event.write(&mysql_bin_log.log_file))
+ return true;
+ status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written);
+
+ /* Update the replication state (last GTID in each replication domain). */
+ mysql_mutex_lock(&LOCK_rpl_gtid_state);
+ rpl_global_gtid_binlog_state.update(&gtid);
+ mysql_mutex_unlock(&LOCK_rpl_gtid_state);
+ return false;
+}
+
+
+int
+MYSQL_BIN_LOG::write_state_to_file()
+{
+ File file_no;
+ IO_CACHE cache;
+ char buf[FN_REFLEN];
+ int err;
+ bool opened= false;
+ bool inited= false;
+
+ fn_format(buf, opt_bin_logname, mysql_data_home, ".state",
+ MY_UNPACK_FILENAME);
+ if ((file_no= mysql_file_open(key_file_binlog_state, buf,
+ O_RDWR|O_CREAT|O_TRUNC|O_BINARY,
+ MYF(MY_WME))) < 0)
+ {
+ err= 1;
+ goto err;
+ }
+ opened= true;
+ if ((err= init_io_cache(&cache, file_no, IO_SIZE, WRITE_CACHE, 0, 0,
+ MYF(MY_WME|MY_WAIT_IF_FULL))))
+ goto err;
+ inited= true;
+ if ((err= rpl_global_gtid_binlog_state.write_to_iocache(&cache)))
+ goto err;
+ inited= false;
+ if ((err= end_io_cache(&cache)))
+ goto err;
+ if ((err= mysql_file_sync(file_no, MYF(MY_WME|MY_SYNC_FILESIZE))))
+ goto err;
+ goto end;
+
+err:
+ sql_print_error("Error writing binlog state to file '%s'.\n", buf);
+ if (inited)
+ end_io_cache(&cache);
+end:
+ if (opened)
+ mysql_file_close(file_no, MYF(0));
+
+ return err;
+}
+
+
+int
+MYSQL_BIN_LOG::read_state_from_file()
+{
+ File file_no;
+ IO_CACHE cache;
+ char buf[FN_REFLEN];
+ int err;
+ bool opened= false;
+ bool inited= false;
+
+ if (state_read)
+ return 0;
+ state_read= true;
+
+ fn_format(buf, opt_bin_logname, mysql_data_home, ".state",
+ MY_UNPACK_FILENAME);
+ if ((file_no= mysql_file_open(key_file_binlog_state, buf,
+ O_RDONLY|O_BINARY, MYF(0))) < 0)
+ {
+ if (my_errno != ENOENT)
+ {
+ err= 1;
+ goto err;
+ }
+ else
+ {
+ /*
+ If the state file does not exist, this is the first server startup
+ with GTID enabled. So initialize to empty state.
+ */
+ rpl_global_gtid_binlog_state.reset();
+ err= 0;
+ goto end;
+ }
+ }
+ opened= true;
+ if ((err= init_io_cache(&cache, file_no, IO_SIZE, READ_CACHE, 0, 0,
+ MYF(MY_WME|MY_WAIT_IF_FULL))))
+ goto err;
+ inited= true;
+ if ((err= rpl_global_gtid_binlog_state.read_from_iocache(&cache)))
+ goto err;
+ goto end;
+
+err:
+ sql_print_error("Error reading binlog GTID state from file '%s'.\n", buf);
+end:
+ if (inited)
+ end_io_cache(&cache);
+ if (opened)
+ mysql_file_close(file_no, MYF(0));
+ /* Pick the next unused seq_no from the loaded binlog state. */
+ bump_seq_no_counter_if_needed(
+ rpl_global_gtid_binlog_state.seq_no_from_state());
+
+ return err;
+}
+
+
+int
+MYSQL_BIN_LOG::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
+{
+ return rpl_global_gtid_binlog_state.get_most_recent_gtid_list(list, size);
+}
+
+
+bool
+MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id,
+ rpl_gtid *out_gtid)
+{
+ rpl_gtid *gtid;
+ mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ if ((gtid= rpl_global_gtid_binlog_state.find(domain_id, server_id)))
+ *out_gtid= *gtid;
+ mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ return gtid != NULL;
+}
+
+
+bool
+MYSQL_BIN_LOG::lookup_domain_in_binlog_state(uint32 domain_id,
+ rpl_gtid *out_gtid)
+{
+ rpl_binlog_state::element *elem;
+ bool res;
+
+ mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ elem= (rpl_binlog_state::element *)
+ my_hash_search(&rpl_global_gtid_binlog_state.hash,
+ (const uchar *)&domain_id, 0);
+ if (elem)
+ {
+ res= true;
+ *out_gtid= *elem->last_gtid;
+ }
+ else
+ res= false;
+ mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+
+ return res;
+}
+
+
+void
+MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint64 seq_no)
+{
+ mysql_mutex_lock(&LOCK_gtid_counter);
+ if (global_gtid_counter < seq_no)
+ global_gtid_counter= seq_no;
+ mysql_mutex_unlock(&LOCK_gtid_counter);
+}
+
+
/**
Write an event to the binary log. If with_annotate != NULL and
*with_annotate = TRUE write also Annotate_rows before the event
@@ -5379,6 +5642,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
my_org_b_tell= my_b_tell(file);
mysql_mutex_lock(&LOCK_log);
prev_binlog_id= current_binlog_id;
+ write_gtid_event(thd, true, using_trans);
}
else
{
@@ -6251,19 +6515,6 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
break;
}
- /*
- Log "BEGIN" at the beginning of every transaction. Here, a transaction is
- either a BEGIN..COMMIT block or a single statement in autocommit mode.
-
- Create the necessary events here, where we have the correct THD (and
- thread context).
-
- Due to group commit the actual writing to binlog may happen in a different
- thread.
- */
- Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), using_trx_cache, TRUE,
- TRUE, 0);
- entry.begin_event= &qinfo;
entry.end_event= end_ev;
if (cache_mngr->stmt_cache.has_incident() ||
cache_mngr->trx_cache.has_incident())
@@ -6639,10 +6890,8 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry)
{
binlog_cache_mngr *mngr= entry->cache_mngr;
- if (entry->begin_event->write(&log_file))
+ if (write_gtid_event(entry->thd, false, entry->using_trx_cache))
return ER_ERROR_ON_WRITE;
- status_var_add(entry->thd->status_var.binlog_bytes_written,
- entry->begin_event->data_written);
if (entry->using_stmt_cache && !mngr->stmt_cache.empty() &&
write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE)))
@@ -6782,6 +7031,8 @@ int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd,
void MYSQL_BIN_LOG::close(uint exiting)
{ // One can't set log_type here!
+ bool failed_to_save_state= false;
+
DBUG_ENTER("MYSQL_BIN_LOG::close");
DBUG_PRINT("enter",("exiting: %d", (int) exiting));
if (log_state == LOG_OPENED)
@@ -6799,6 +7050,27 @@ void MYSQL_BIN_LOG::close(uint exiting)
s.write(&log_file);
bytes_written+= s.data_written;
signal_update();
+
+ /*
+ When we shut down server, write out the binlog state to a separate
+ file so we do not have to scan an entire binlog file to recover it
+ at next server start.
+
+ Note that this must be written and synced to disk before marking the
+ last binlog file as "not crashed".
+ */
+ if (!is_relay_log && write_state_to_file())
+ {
+ sql_print_error("Failed to save binlog GTID state during shutdown. "
+ "Binlog will be marked as crashed, so that crash "
+ "recovery can recover the state at next server "
+ "startup.");
+ /*
+ Leave binlog file marked as crashed, so we can recover state by
+ scanning it now that we failed to write out the state properly.
+ */
+ failed_to_save_state= true;
+ }
}
#endif /* HAVE_REPLICATION */
@@ -6807,7 +7079,8 @@ void MYSQL_BIN_LOG::close(uint exiting)
&& !(exiting & LOG_CLOSE_DELAYED_CLOSE))
{
my_off_t org_position= mysql_file_tell(log_file.file, MYF(0));
- clear_inuse_flag_when_closing(log_file.file);
+ if (!failed_to_save_state)
+ clear_inuse_flag_when_closing(log_file.file);
/*
Restore position so that anything we have in the IO_cache is written
to the correct position.
@@ -7983,9 +8256,13 @@ int TC_LOG_BINLOG::open(const char *opt_name)
sql_print_information("Recovering after a crash using %s", opt_name);
error= recover(&log_info, log_name, &log,
(Format_description_log_event *)ev);
+ state_read= true;
+ /* Pick the next unused seq_no from the recovered binlog state. */
+ bump_seq_no_counter_if_needed(
+ rpl_global_gtid_binlog_state.seq_no_from_state());
}
else
- error=0;
+ error= read_state_from_file();
delete ev;
end_io_cache(&log);
@@ -8235,6 +8512,28 @@ binlog_background_thread(void *arg __attribute__((unused)))
mysql_mutex_unlock(&LOCK_thread_count);
thd->store_globals();
+ /*
+ Load the slave replication GTID state from the mysql.rpl_slave_state
+ table.
+
+ This is mostly so that we can start our seq_no counter from the highest
+ seq_no seen by a slave. This way, we have a way to tell if a transaction
+ logged by ourselves as master is newer or older than a replicated
+ transaction.
+ */
+#ifdef HAVE_REPLICATION
+ if (rpl_load_gtid_slave_state(thd))
+ sql_print_warning("Failed to load slave replication state from table "
+ "%s.%s: %u: %s", "mysql",
+ rpl_gtid_slave_state_table_name.str,
+ thd->stmt_da->sql_errno(), thd->stmt_da->message());
+#endif
+
+ mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread);
+ binlog_background_thread_started= true;
+ mysql_cond_signal(&mysql_bin_log.COND_binlog_background_thread_end);
+ mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread);
+
for (;;)
{
/*
@@ -8333,7 +8632,16 @@ start_binlog_background_thread()
binlog_background_thread, NULL))
return 1;
- binlog_background_thread_started= true;
+ /*
+ Wait for the thread to have started (so we know that the slave replication
+ state is loaded and we have correct global_gtid_counter).
+ */
+ mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread);
+ while (!binlog_background_thread_started)
+ mysql_cond_wait(&mysql_bin_log.COND_binlog_background_thread_end,
+ &mysql_bin_log.LOCK_binlog_background_thread);
+ mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread);
+
return 0;
}
@@ -8412,6 +8720,37 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
}
break;
}
+ case GTID_LIST_EVENT:
+ if (first_round)
+ {
+ uint32 i;
+ Gtid_list_log_event *glev= (Gtid_list_log_event *)ev;
+
+ /* Initialise the binlog state from the Gtid_list event. */
+ rpl_global_gtid_binlog_state.reset();
+ for (i= 0; i < glev->count; ++i)
+ {
+ if (rpl_global_gtid_binlog_state.update(&(glev->list[i])))
+ goto err2;
+ }
+ }
+ break;
+
+ case GTID_EVENT:
+ if (first_round)
+ {
+ Gtid_log_event *gev= (Gtid_log_event *)ev;
+ rpl_gtid gtid;
+
+ /* Update the binlog state with any GTID logged after Gtid_list. */
+ gtid.domain_id= gev->domain_id;
+ gtid.server_id= gev->server_id;
+ gtid.seq_no= gev->seq_no;
+ if (rpl_global_gtid_binlog_state.update(&gtid))
+ goto err2;
+ }
+ break;
+
default:
/* Nothing. */
break;