summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-04-17 15:17:01 +0200
committerunknown <knielsen@knielsen-hq.org>2013-04-17 15:17:01 +0200
commit0e7410a154560486ce4e0c975f122a0b15853bb1 (patch)
treee65ba67f602f6c99a52a1953b2172043476a5930 /sql
parentf398ced7f05e64b0f2bdb8e0a70de907aff9e1eb (diff)
parent30b2c64c4e87f8924fe817f5c607ae90a990e9e5 (diff)
downloadmariadb-git-0e7410a154560486ce4e0c975f122a0b15853bb1.tar.gz
Merge 10.0-base -> 10.0 (GTID).
Diffstat (limited to 'sql')
-rw-r--r--sql/CMakeLists.txt1
-rw-r--r--sql/event_parse_data.cc4
-rw-r--r--sql/item_create.cc24
-rw-r--r--sql/item_func.cc10
-rw-r--r--sql/item_strfunc.cc41
-rw-r--r--sql/item_strfunc.h11
-rw-r--r--sql/lex.h2
-rw-r--r--sql/log.cc385
-rw-r--r--sql/log.h14
-rw-r--r--sql/log_event.cc554
-rw-r--r--sql/log_event.h256
-rw-r--r--sql/log_event_old.cc2
-rw-r--r--sql/mysqld.cc74
-rw-r--r--sql/mysqld.h6
-rw-r--r--sql/repl_failsafe.cc9
-rw-r--r--sql/rpl_gtid.cc1122
-rw-r--r--sql/rpl_gtid.h179
-rw-r--r--sql/rpl_handler.cc4
-rw-r--r--sql/rpl_injector.cc8
-rw-r--r--sql/rpl_mi.cc62
-rw-r--r--sql/rpl_mi.h7
-rw-r--r--sql/rpl_rli.cc165
-rw-r--r--sql/rpl_rli.h14
-rw-r--r--sql/share/errmsg-utf8.txt20
-rw-r--r--sql/slave.cc272
-rw-r--r--sql/sql_base.cc15
-rw-r--r--sql/sql_class.cc8
-rw-r--r--sql/sql_class.h10
-rw-r--r--sql/sql_lex.h5
-rw-r--r--sql/sql_parse.cc4
-rw-r--r--sql/sql_repl.cc995
-rw-r--r--sql/sql_repl.h8
-rw-r--r--sql/sql_show.cc22
-rw-r--r--sql/sql_yacc.yy14
-rw-r--r--sql/sys_vars.cc133
-rw-r--r--sql/sys_vars.h40
-rw-r--r--sql/table.cc40
-rw-r--r--sql/table.h2
38 files changed, 4382 insertions, 160 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index aa916a3725a..0b4943851f0 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -89,6 +89,7 @@ SET (SQL_SOURCE
threadpool_common.cc
../sql-common/mysql_async.c
my_apc.cc my_apc.h
+ rpl_gtid.cc
${GEN_SOURCES}
${MYSYS_LIBWRAP_SOURCE}
)
diff --git a/sql/event_parse_data.cc b/sql/event_parse_data.cc
index ad812a6aa5d..4316a9f1fb8 100644
--- a/sql/event_parse_data.cc
+++ b/sql/event_parse_data.cc
@@ -574,8 +574,8 @@ void Event_parse_data::check_originator_id(THD *thd)
status= Event_parse_data::SLAVESIDE_DISABLED;
status_changed= true;
}
- originator = thd->server_id;
+ originator = thd->variables.server_id;
}
else
- originator = server_id;
+ originator = global_system_variables.server_id;
}
diff --git a/sql/item_create.cc b/sql/item_create.cc
index ba6eb7ff603..c1cefed6f8b 100644
--- a/sql/item_create.cc
+++ b/sql/item_create.cc
@@ -447,6 +447,19 @@ protected:
};
+class Create_func_binlog_gtid_pos : public Create_func_arg2
+{
+public:
+ virtual Item *create_2_arg(THD *thd, Item *arg1, Item *arg2);
+
+ static Create_func_binlog_gtid_pos s_singleton;
+
+protected:
+ Create_func_binlog_gtid_pos() {}
+ virtual ~Create_func_binlog_gtid_pos() {}
+};
+
+
class Create_func_bit_count : public Create_func_arg1
{
public:
@@ -3100,6 +3113,16 @@ Create_func_bin::create_1_arg(THD *thd, Item *arg1)
}
+Create_func_binlog_gtid_pos Create_func_binlog_gtid_pos::s_singleton;
+
+Item*
+Create_func_binlog_gtid_pos::create_2_arg(THD *thd, Item *arg1, Item *arg2)
+{
+ thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION);
+ return new (thd->mem_root) Item_func_binlog_gtid_pos(arg1, arg2);
+}
+
+
Create_func_bit_count Create_func_bit_count::s_singleton;
Item*
@@ -5322,6 +5345,7 @@ static Native_func_registry func_array[] =
{ { C_STRING_WITH_LEN("ATAN2") }, BUILDER(Create_func_atan)},
{ { C_STRING_WITH_LEN("BENCHMARK") }, BUILDER(Create_func_benchmark)},
{ { C_STRING_WITH_LEN("BIN") }, BUILDER(Create_func_bin)},
+ { { C_STRING_WITH_LEN("BINLOG_GTID_POS") }, BUILDER(Create_func_binlog_gtid_pos)},
{ { C_STRING_WITH_LEN("BIT_COUNT") }, BUILDER(Create_func_bit_count)},
{ { C_STRING_WITH_LEN("BIT_LENGTH") }, BUILDER(Create_func_bit_length)},
{ { C_STRING_WITH_LEN("BUFFER") }, GEOM_BUILDER(Create_func_buffer)},
diff --git a/sql/item_func.cc b/sql/item_func.cc
index 4df7638370a..d0fca63688a 100644
--- a/sql/item_func.cc
+++ b/sql/item_func.cc
@@ -5740,6 +5740,14 @@ longlong Item_func_get_system_var::val_int()
{
THD *thd= current_thd;
+ DBUG_EXECUTE_IF("simulate_non_gtid_aware_master",
+ {
+ if (0 == strcmp("gtid_domain_id", var->name.str))
+ {
+ my_error(ER_VAR_CANT_BE_READ, MYF(0), var->name.str);
+ return 0;
+ }
+ });
if (cache_present && thd->query_id == used_query_id)
{
if (cache_present & GET_SYS_VAR_CACHE_LONG)
@@ -6727,7 +6735,7 @@ ulonglong uuid_value;
void uuid_short_init()
{
- uuid_value= ((((ulonglong) server_id) << 56) +
+ uuid_value= ((((ulonglong) global_system_variables.server_id) << 56) +
(((ulonglong) server_start_time) << 24));
}
diff --git a/sql/item_strfunc.cc b/sql/item_strfunc.cc
index 3df03e63297..532c6d7ff08 100644
--- a/sql/item_strfunc.cc
+++ b/sql/item_strfunc.cc
@@ -59,6 +59,7 @@ C_MODE_START
#include "../mysys/my_static.h" // For soundex_map
C_MODE_END
#include "sql_show.h" // append_identifier
+#include <sql_repl.h>
/**
@todo Remove this. It is not safe to use a shared String object.
@@ -2667,6 +2668,46 @@ err:
}
+void Item_func_binlog_gtid_pos::fix_length_and_dec()
+{
+ collation.set(system_charset_info);
+ max_length= MAX_BLOB_WIDTH;
+ maybe_null= 1;
+}
+
+
+String *Item_func_binlog_gtid_pos::val_str(String *str)
+{
+ DBUG_ASSERT(fixed == 1);
+#ifndef HAVE_REPLICATION
+ null_value= 0;
+ str->copy("", 0, system_charset_info);
+ return str;
+#else
+ String name_str, *name;
+ longlong pos;
+
+ if (args[0]->null_value || args[1]->null_value)
+ goto err;
+
+ name= args[0]->val_str(&name_str);
+ pos= args[1]->val_int();
+
+ if (pos < 0 || pos > UINT_MAX32)
+ goto err;
+
+ if (gtid_state_from_binlog_pos(name->c_ptr_safe(), (uint32)pos, str))
+ goto err;
+ null_value= 0;
+ return str;
+
+err:
+ null_value= 1;
+ return NULL;
+#endif /* !HAVE_REPLICATION */
+}
+
+
void Item_func_rpad::fix_length_and_dec()
{
// Handle character set for args[0] and args[2].
diff --git a/sql/item_strfunc.h b/sql/item_strfunc.h
index 00863e9af2b..89d7fa67f6b 100644
--- a/sql/item_strfunc.h
+++ b/sql/item_strfunc.h
@@ -598,6 +598,17 @@ public:
};
+class Item_func_binlog_gtid_pos :public Item_str_func
+{
+ String tmp_value;
+public:
+ Item_func_binlog_gtid_pos(Item *arg1,Item *arg2) :Item_str_func(arg1,arg2) {}
+ String *val_str(String *);
+ void fix_length_and_dec();
+ const char *func_name() const { return "binlog_gtid_pos"; }
+};
+
+
class Item_func_rpad :public Item_str_func
{
String tmp_value, rpad_str;
diff --git a/sql/lex.h b/sql/lex.h
index 101880597d5..c579ea84533 100644
--- a/sql/lex.h
+++ b/sql/lex.h
@@ -77,6 +77,7 @@ static SYMBOL symbols[] = {
{ "AUTHORS", SYM(AUTHORS_SYM)},
{ "AUTO_INCREMENT", SYM(AUTO_INC)},
{ "AUTOEXTEND_SIZE", SYM(AUTOEXTEND_SIZE_SYM)},
+ { "AUTO", SYM(AUTO_SYM)},
{ "AVG", SYM(AVG_SYM)},
{ "AVG_ROW_LENGTH", SYM(AVG_ROW_LENGTH)},
{ "BACKUP", SYM(BACKUP_SYM)},
@@ -329,6 +330,7 @@ static SYMBOL symbols[] = {
{ "LOW_PRIORITY", SYM(LOW_PRIORITY)},
{ "MASTER", SYM(MASTER_SYM)},
{ "MASTER_CONNECT_RETRY", SYM(MASTER_CONNECT_RETRY_SYM)},
+ { "MASTER_USE_GTID", SYM(MASTER_USE_GTID_SYM)},
{ "MASTER_HOST", SYM(MASTER_HOST_SYM)},
{ "MASTER_LOG_FILE", SYM(MASTER_LOG_FILE_SYM)},
{ "MASTER_LOG_POS", SYM(MASTER_LOG_POS_SYM)},
diff --git a/sql/log.cc b/sql/log.cc
index 088816d7954..48c458c2607 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -120,6 +120,8 @@ static MYSQL_BIN_LOG::xid_count_per_binlog *
static bool start_binlog_background_thread();
+static rpl_binlog_state rpl_global_gtid_binlog_state;
+
/**
purge logs, master and slave sides both, related error code
convertor.
@@ -686,7 +688,8 @@ bool Log_to_csv_event_handler::
/* do a write */
if (table->field[1]->store(user_host, user_host_len, client_cs) ||
table->field[2]->store((longlong) thread_id, TRUE) ||
- table->field[3]->store((longlong) server_id, TRUE) ||
+ table->field[3]->store((longlong) global_system_variables.server_id,
+ TRUE) ||
table->field[4]->store(command_type, command_type_len, client_cs))
goto err;
@@ -883,7 +886,7 @@ bool Log_to_csv_event_handler::
table->field[8]->set_notnull();
}
- if (table->field[9]->store((longlong) server_id, TRUE))
+ if (table->field[9]->store((longlong)global_system_variables.server_id, TRUE))
goto err;
table->field[9]->set_notnull();
@@ -2938,7 +2941,7 @@ MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period)
bytes_written(0), file_id(1), open_count(1),
group_commit_queue(0), group_commit_queue_busy(FALSE),
num_commits(0), num_group_commits(0),
- sync_period_ptr(sync_period), sync_counter(0),
+ sync_period_ptr(sync_period), sync_counter(0), state_read(false),
is_relay_log(0), signal_cnt(0),
checksum_alg_reset(BINLOG_CHECKSUM_ALG_UNDEF),
relay_log_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
@@ -3132,6 +3135,9 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
DBUG_ENTER("MYSQL_BIN_LOG::open");
DBUG_PRINT("enter",("log_type: %d",(int) log_type_arg));
+ if (!is_relay_log && read_state_from_file())
+ DBUG_RETURN(1);
+
if (!is_relay_log && !binlog_background_thread_started &&
start_binlog_background_thread())
DBUG_RETURN(1);
@@ -3245,6 +3251,47 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
if (!is_relay_log)
{
char buf[FN_REFLEN];
+
+ /*
+ Output a Gtid_list_log_event at the start of the binlog file.
+
+ This is used to quickly determine which GTIDs are found in binlog
+ files earlier than this one, and which are found in this (or later)
+ binlogs.
+
+ The list gives a mapping from (domain_id, server_id) -> seq_no (so
+ this means that there is at most one entry for every unique pair
+ (domain_id, server_id) in the list). It indicates that this seq_no is
+ the last one found in an earlier binlog file for this (domain_id,
+ server_id) combination - so any higher seq_no should be search for
+ from this binlog file, or a later one.
+
+ This allows to locate the binlog file containing a given GTID by
+ scanning backwards, reading just the Gtid_list_log_event at the
+ start of each file, and scanning only the relevant binlog file when
+ found, not all binlog files.
+
+ The existence of a given entry (domain_id, server_id, seq_no)
+ guarantees only that this seq_no will not be found in this or any
+ later binlog file. It does not guarantee that it can be found it an
+ earlier binlog file, for example the file may have been purged.
+
+ If there is no entry for a given (domain_id, server_id) pair, then
+ it means that no such GTID exists in any earlier binlog. It is
+ permissible to remove such pair from future Gtid_list_log_events
+ if all previous binlog files containing such GTIDs have been purged
+ (though such optimization is not performed at the time of this
+ writing). So if there is no entry for given GTID it means that such
+ GTID should be search for in this or later binlog file, same as if
+ there had been an entry (domain_id, server_id, 0).
+ */
+
+ Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state);
+ if (gl_ev.write(&log_file))
+ goto err;
+
+ /* Output a binlog checkpoint event at the start of the binlog file. */
+
/*
Construct an entry in the binlog_xid_count_list for the new binlog
file (we will not link it into the list until we know the new file
@@ -3671,7 +3718,8 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log)
const char* save_name;
DBUG_ENTER("reset_logs");
- ha_reset_logs(thd);
+ if (thd)
+ ha_reset_logs(thd);
/*
We need to get both locks to be sure that no one is trying to
write to the index log file.
@@ -3796,6 +3844,14 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log)
break;
}
+ if (!is_relay_log)
+ {
+ rpl_global_gtid_binlog_state.reset();
+ mysql_mutex_lock(&LOCK_gtid_counter);
+ global_gtid_counter= 0;
+ mysql_mutex_unlock(&LOCK_gtid_counter);
+ }
+
/* Start logging with a new file */
close(LOG_CLOSE_INDEX | LOG_CLOSE_TO_BE_OPENED);
if ((error= my_delete_allow_opened(index_file_name, MYF(0)))) // Reset (open will update)
@@ -5310,6 +5366,213 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
DBUG_RETURN(error);
}
+
+/* Generate a new global transaction ID, and write it to the binlog */
+bool
+MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
+ bool is_transactional)
+{
+ rpl_gtid gtid;
+ uint64 seq_no;
+
+ seq_no= thd->variables.gtid_seq_no;
+ /*
+ Reset the session variable gtid_seq_no, to reduce the risk of accidentally
+ producing a duplicate GTID.
+ */
+ thd->variables.gtid_seq_no= 0;
+ if (seq_no != 0)
+ {
+ /*
+ If we see a higher sequence number, use that one as the basis of any
+ later generated sequence numbers.
+ */
+ bump_seq_no_counter_if_needed(seq_no);
+ }
+ else
+ {
+ mysql_mutex_lock(&LOCK_gtid_counter);
+ seq_no= ++global_gtid_counter;
+ mysql_mutex_unlock(&LOCK_gtid_counter);
+ }
+ gtid.seq_no= seq_no;
+ gtid.domain_id= thd->variables.gtid_domain_id;
+
+ Gtid_log_event gtid_event(thd, gtid.seq_no, gtid.domain_id, standalone,
+ LOG_EVENT_SUPPRESS_USE_F, is_transactional);
+ gtid.server_id= gtid_event.server_id;
+
+ /* Write the event to the binary log. */
+ if (gtid_event.write(&mysql_bin_log.log_file))
+ return true;
+ status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written);
+
+ /* Update the replication state (last GTID in each replication domain). */
+ mysql_mutex_lock(&LOCK_rpl_gtid_state);
+ rpl_global_gtid_binlog_state.update(&gtid);
+ mysql_mutex_unlock(&LOCK_rpl_gtid_state);
+ return false;
+}
+
+
+int
+MYSQL_BIN_LOG::write_state_to_file()
+{
+ File file_no;
+ IO_CACHE cache;
+ char buf[FN_REFLEN];
+ int err;
+ bool opened= false;
+ bool inited= false;
+
+ fn_format(buf, opt_bin_logname, mysql_data_home, ".state",
+ MY_UNPACK_FILENAME);
+ if ((file_no= mysql_file_open(key_file_binlog_state, buf,
+ O_RDWR|O_CREAT|O_TRUNC|O_BINARY,
+ MYF(MY_WME))) < 0)
+ {
+ err= 1;
+ goto err;
+ }
+ opened= true;
+ if ((err= init_io_cache(&cache, file_no, IO_SIZE, WRITE_CACHE, 0, 0,
+ MYF(MY_WME|MY_WAIT_IF_FULL))))
+ goto err;
+ inited= true;
+ if ((err= rpl_global_gtid_binlog_state.write_to_iocache(&cache)))
+ goto err;
+ inited= false;
+ if ((err= end_io_cache(&cache)))
+ goto err;
+ if ((err= mysql_file_sync(file_no, MYF(MY_WME|MY_SYNC_FILESIZE))))
+ goto err;
+ goto end;
+
+err:
+ sql_print_error("Error writing binlog state to file '%s'.\n", buf);
+ if (inited)
+ end_io_cache(&cache);
+end:
+ if (opened)
+ mysql_file_close(file_no, MYF(0));
+
+ return err;
+}
+
+
+int
+MYSQL_BIN_LOG::read_state_from_file()
+{
+ File file_no;
+ IO_CACHE cache;
+ char buf[FN_REFLEN];
+ int err;
+ bool opened= false;
+ bool inited= false;
+
+ if (state_read)
+ return 0;
+ state_read= true;
+
+ fn_format(buf, opt_bin_logname, mysql_data_home, ".state",
+ MY_UNPACK_FILENAME);
+ if ((file_no= mysql_file_open(key_file_binlog_state, buf,
+ O_RDONLY|O_BINARY, MYF(0))) < 0)
+ {
+ if (my_errno != ENOENT)
+ {
+ err= 1;
+ goto err;
+ }
+ else
+ {
+ /*
+ If the state file does not exist, this is the first server startup
+ with GTID enabled. So initialize to empty state.
+ */
+ rpl_global_gtid_binlog_state.reset();
+ err= 0;
+ goto end;
+ }
+ }
+ opened= true;
+ if ((err= init_io_cache(&cache, file_no, IO_SIZE, READ_CACHE, 0, 0,
+ MYF(MY_WME|MY_WAIT_IF_FULL))))
+ goto err;
+ inited= true;
+ if ((err= rpl_global_gtid_binlog_state.read_from_iocache(&cache)))
+ goto err;
+ goto end;
+
+err:
+ sql_print_error("Error reading binlog GTID state from file '%s'.\n", buf);
+end:
+ if (inited)
+ end_io_cache(&cache);
+ if (opened)
+ mysql_file_close(file_no, MYF(0));
+ /* Pick the next unused seq_no from the loaded binlog state. */
+ bump_seq_no_counter_if_needed(
+ rpl_global_gtid_binlog_state.seq_no_from_state());
+
+ return err;
+}
+
+
+int
+MYSQL_BIN_LOG::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
+{
+ return rpl_global_gtid_binlog_state.get_most_recent_gtid_list(list, size);
+}
+
+
+bool
+MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id,
+ rpl_gtid *out_gtid)
+{
+ rpl_gtid *gtid;
+ mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ if ((gtid= rpl_global_gtid_binlog_state.find(domain_id, server_id)))
+ *out_gtid= *gtid;
+ mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ return gtid != NULL;
+}
+
+
+bool
+MYSQL_BIN_LOG::lookup_domain_in_binlog_state(uint32 domain_id,
+ rpl_gtid *out_gtid)
+{
+ rpl_binlog_state::element *elem;
+ bool res;
+
+ mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ elem= (rpl_binlog_state::element *)
+ my_hash_search(&rpl_global_gtid_binlog_state.hash,
+ (const uchar *)&domain_id, 0);
+ if (elem)
+ {
+ res= true;
+ *out_gtid= *elem->last_gtid;
+ }
+ else
+ res= false;
+ mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+
+ return res;
+}
+
+
+void
+MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint64 seq_no)
+{
+ mysql_mutex_lock(&LOCK_gtid_counter);
+ if (global_gtid_counter < seq_no)
+ global_gtid_counter= seq_no;
+ mysql_mutex_unlock(&LOCK_gtid_counter);
+}
+
+
/**
Write an event to the binary log. If with_annotate != NULL and
*with_annotate = TRUE write also Annotate_rows before the event
@@ -5379,6 +5642,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
my_org_b_tell= my_b_tell(file);
mysql_mutex_lock(&LOCK_log);
prev_binlog_id= current_binlog_id;
+ write_gtid_event(thd, true, using_trans);
}
else
{
@@ -6251,19 +6515,6 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
break;
}
- /*
- Log "BEGIN" at the beginning of every transaction. Here, a transaction is
- either a BEGIN..COMMIT block or a single statement in autocommit mode.
-
- Create the necessary events here, where we have the correct THD (and
- thread context).
-
- Due to group commit the actual writing to binlog may happen in a different
- thread.
- */
- Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), using_trx_cache, TRUE,
- TRUE, 0);
- entry.begin_event= &qinfo;
entry.end_event= end_ev;
if (cache_mngr->stmt_cache.has_incident() ||
cache_mngr->trx_cache.has_incident())
@@ -6639,10 +6890,8 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry)
{
binlog_cache_mngr *mngr= entry->cache_mngr;
- if (entry->begin_event->write(&log_file))
+ if (write_gtid_event(entry->thd, false, entry->using_trx_cache))
return ER_ERROR_ON_WRITE;
- status_var_add(entry->thd->status_var.binlog_bytes_written,
- entry->begin_event->data_written);
if (entry->using_stmt_cache && !mngr->stmt_cache.empty() &&
write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE)))
@@ -6782,6 +7031,8 @@ int MYSQL_BIN_LOG::wait_for_update_bin_log(THD* thd,
void MYSQL_BIN_LOG::close(uint exiting)
{ // One can't set log_type here!
+ bool failed_to_save_state= false;
+
DBUG_ENTER("MYSQL_BIN_LOG::close");
DBUG_PRINT("enter",("exiting: %d", (int) exiting));
if (log_state == LOG_OPENED)
@@ -6799,6 +7050,27 @@ void MYSQL_BIN_LOG::close(uint exiting)
s.write(&log_file);
bytes_written+= s.data_written;
signal_update();
+
+ /*
+ When we shut down server, write out the binlog state to a separate
+ file so we do not have to scan an entire binlog file to recover it
+ at next server start.
+
+ Note that this must be written and synced to disk before marking the
+ last binlog file as "not crashed".
+ */
+ if (!is_relay_log && write_state_to_file())
+ {
+ sql_print_error("Failed to save binlog GTID state during shutdown. "
+ "Binlog will be marked as crashed, so that crash "
+ "recovery can recover the state at next server "
+ "startup.");
+ /*
+ Leave binlog file marked as crashed, so we can recover state by
+ scanning it now that we failed to write out the state properly.
+ */
+ failed_to_save_state= true;
+ }
}
#endif /* HAVE_REPLICATION */
@@ -6807,7 +7079,8 @@ void MYSQL_BIN_LOG::close(uint exiting)
&& !(exiting & LOG_CLOSE_DELAYED_CLOSE))
{
my_off_t org_position= mysql_file_tell(log_file.file, MYF(0));
- clear_inuse_flag_when_closing(log_file.file);
+ if (!failed_to_save_state)
+ clear_inuse_flag_when_closing(log_file.file);
/*
Restore position so that anything we have in the IO_cache is written
to the correct position.
@@ -7983,9 +8256,13 @@ int TC_LOG_BINLOG::open(const char *opt_name)
sql_print_information("Recovering after a crash using %s", opt_name);
error= recover(&log_info, log_name, &log,
(Format_description_log_event *)ev);
+ state_read= true;
+ /* Pick the next unused seq_no from the recovered binlog state. */
+ bump_seq_no_counter_if_needed(
+ rpl_global_gtid_binlog_state.seq_no_from_state());
}
else
- error=0;
+ error= read_state_from_file();
delete ev;
end_io_cache(&log);
@@ -8235,6 +8512,28 @@ binlog_background_thread(void *arg __attribute__((unused)))
mysql_mutex_unlock(&LOCK_thread_count);
thd->store_globals();
+ /*
+ Load the slave replication GTID state from the mysql.rpl_slave_state
+ table.
+
+ This is mostly so that we can start our seq_no counter from the highest
+ seq_no seen by a slave. This way, we have a way to tell if a transaction
+ logged by ourselves as master is newer or older than a replicated
+ transaction.
+ */
+#ifdef HAVE_REPLICATION
+ if (rpl_load_gtid_slave_state(thd))
+ sql_print_warning("Failed to load slave replication state from table "
+ "%s.%s: %u: %s", "mysql",
+ rpl_gtid_slave_state_table_name.str,
+ thd->stmt_da->sql_errno(), thd->stmt_da->message());
+#endif
+
+ mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread);
+ binlog_background_thread_started= true;
+ mysql_cond_signal(&mysql_bin_log.COND_binlog_background_thread_end);
+ mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread);
+
for (;;)
{
/*
@@ -8333,7 +8632,16 @@ start_binlog_background_thread()
binlog_background_thread, NULL))
return 1;
- binlog_background_thread_started= true;
+ /*
+ Wait for the thread to have started (so we know that the slave replication
+ state is loaded and we have correct global_gtid_counter).
+ */
+ mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread);
+ while (!binlog_background_thread_started)
+ mysql_cond_wait(&mysql_bin_log.COND_binlog_background_thread_end,
+ &mysql_bin_log.LOCK_binlog_background_thread);
+ mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread);
+
return 0;
}
@@ -8412,6 +8720,37 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
}
break;
}
+ case GTID_LIST_EVENT:
+ if (first_round)
+ {
+ uint32 i;
+ Gtid_list_log_event *glev= (Gtid_list_log_event *)ev;
+
+ /* Initialise the binlog state from the Gtid_list event. */
+ rpl_global_gtid_binlog_state.reset();
+ for (i= 0; i < glev->count; ++i)
+ {
+ if (rpl_global_gtid_binlog_state.update(&(glev->list[i])))
+ goto err2;
+ }
+ }
+ break;
+
+ case GTID_EVENT:
+ if (first_round)
+ {
+ Gtid_log_event *gev= (Gtid_log_event *)ev;
+ rpl_gtid gtid;
+
+ /* Update the binlog state with any GTID logged after Gtid_list. */
+ gtid.domain_id= gev->domain_id;
+ gtid.server_id= gev->server_id;
+ gtid.seq_no= gev->seq_no;
+ if (rpl_global_gtid_binlog_state.update(&gtid))
+ goto err2;
+ }
+ break;
+
default:
/* Nothing. */
break;
diff --git a/sql/log.h b/sql/log.h
index da8faa36a00..bd20c8aee09 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -396,6 +396,7 @@ private:
( ((ulong)(c)>>1) == BINLOG_COOKIE_DUMMY_ID )
class binlog_cache_mngr;
+struct rpl_gtid;
class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
{
private:
@@ -420,11 +421,10 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
bool using_stmt_cache;
bool using_trx_cache;
/*
- Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be
+ Extra events (COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be
written during group commit. The incident_event is only valid if
trx_data->has_incident() is true.
*/
- Log_event *begin_event;
Log_event *end_event;
Log_event *incident_event;
/* Set during group commit to record any per-thread error. */
@@ -507,6 +507,8 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
*/
uint *sync_period_ptr;
uint sync_counter;
+ /* Protect against reading the binlog state file twice. */
+ bool state_read;
inline uint get_sync_period()
{
@@ -773,6 +775,14 @@ public:
inline uint32 get_open_count() { return open_count; }
void set_status_variables(THD *thd);
bool is_xidlist_idle();
+ bool write_gtid_event(THD *thd, bool standalone, bool is_transactional);
+ int read_state_from_file();
+ int write_state_to_file();
+ int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
+ bool find_in_binlog_state(uint32 domain_id, uint32 server_id,
+ rpl_gtid *out_gtid);
+ bool lookup_domain_in_binlog_state(uint32 domain_id, rpl_gtid *out_gtid);
+ void bump_seq_no_counter_if_needed(uint64 seq_no);
};
class Log_event_handler
diff --git a/sql/log_event.cc b/sql/log_event.cc
index ccfbd621c04..27620ea465a 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -537,7 +537,7 @@ static char *slave_load_file_stem(char *buf, uint file_id,
to_unix_path(buf);
buf = strend(buf);
- buf = int10_to_str(::server_id, buf, 10);
+ buf = int10_to_str(global_system_variables.server_id, buf, 10);
*buf++ = '-';
buf = int10_to_str(event_server_id, buf, 10);
*buf++ = '-';
@@ -573,7 +573,7 @@ static void cleanup_load_tmpdir()
LOAD DATA.
*/
p= strmake(prefbuf, STRING_WITH_LEN(PREFIX_SQL_LOAD));
- p= int10_to_str(::server_id, p, 10);
+ p= int10_to_str(global_system_variables.server_id, p, 10);
*(p++)= '-';
*p= 0;
@@ -749,6 +749,8 @@ const char* Log_event::get_type_str(Log_event_type type)
case INCIDENT_EVENT: return "Incident";
case ANNOTATE_ROWS_EVENT: return "Annotate_rows";
case BINLOG_CHECKPOINT_EVENT: return "Binlog_checkpoint";
+ case GTID_EVENT: return "Gtid";
+ case GTID_LIST_EVENT: return "Gtid_list";
default: return "Unknown"; /* impossible */
}
}
@@ -769,7 +771,7 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
crc(0), thd(thd_arg),
checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{
- server_id= thd->server_id;
+ server_id= thd->variables.server_id;
when= thd->start_time;
when_sec_part=thd->start_time_sec_part;
@@ -794,7 +796,7 @@ Log_event::Log_event()
cache_type(Log_event::EVENT_INVALID_CACHE), crc(0),
thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{
- server_id= ::server_id;
+ server_id= global_system_variables.server_id;
/*
We can't call my_time() here as this would cause a call before
my_init() is called
@@ -909,9 +911,11 @@ int Log_event::do_update_pos(Relay_log_info *rli)
if (debug_not_change_ts_if_art_event == 1
&& is_artificial_event())
debug_not_change_ts_if_art_event= 0; );
- rli->stmt_done(log_pos, is_artificial_event() &&
- IF_DBUG(debug_not_change_ts_if_art_event > 0, 1) ?
- 0 : when);
+ rli->stmt_done(log_pos,
+ (is_artificial_event() &&
+ IF_DBUG(debug_not_change_ts_if_art_event > 0, 1) ?
+ 0 : when),
+ thd);
DBUG_EXECUTE_IF("let_first_flush_log_change_timestamp",
if (debug_not_change_ts_if_art_event == 0)
debug_not_change_ts_if_art_event= 2; );
@@ -926,10 +930,11 @@ Log_event::do_shall_skip(Relay_log_info *rli)
DBUG_PRINT("info", ("ev->server_id: %lu, ::server_id: %lu,"
" rli->replicate_same_server_id: %d,"
" rli->slave_skip_counter: %lu",
- (ulong) server_id, (ulong) ::server_id,
+ (ulong) server_id, (ulong) global_system_variables.server_id,
rli->replicate_same_server_id,
rli->slave_skip_counter));
- if ((server_id == ::server_id && !rli->replicate_same_server_id) ||
+ if ((server_id == global_system_variables.server_id &&
+ !rli->replicate_same_server_id) ||
(rli->slave_skip_counter == 1 && rli->is_in_group()) ||
(flags & LOG_EVENT_SKIP_REPLICATION_F &&
opt_replicate_events_marked_for_skip != RPL_SKIP_REPLICATE))
@@ -1560,6 +1565,12 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
case BINLOG_CHECKPOINT_EVENT:
ev = new Binlog_checkpoint_log_event(buf, event_len, description_event);
break;
+ case GTID_EVENT:
+ ev = new Gtid_log_event(buf, event_len, description_event);
+ break;
+ case GTID_LIST_EVENT:
+ ev = new Gtid_list_log_event(buf, event_len, description_event);
+ break;
#ifdef HAVE_REPLICATION
case SLAVE_EVENT: /* can never happen (unused event) */
ev = new Slave_log_event(buf, event_len, description_event);
@@ -3437,6 +3448,53 @@ Query_log_event::dummy_event(String *packet, ulong ev_offset,
return 0;
}
+/*
+ Replace an event (GTID event) with a BEGIN query event, to be compatible
+ with an old slave.
+*/
+int
+Query_log_event::begin_event(String *packet, ulong ev_offset,
+ uint8 checksum_alg)
+{
+ uchar *p= (uchar *)packet->ptr() + ev_offset;
+ uchar *q= p + LOG_EVENT_HEADER_LEN;
+ size_t data_len= packet->length() - ev_offset;
+ uint16 flags;
+
+ if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
+ data_len-= BINLOG_CHECKSUM_LEN;
+ else
+ DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
+
+ /* Currently we only need to replace GTID event. */
+ DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN);
+ if (data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
+ return 1;
+
+ flags= uint2korr(p + FLAGS_OFFSET);
+ flags&= ~LOG_EVENT_THREAD_SPECIFIC_F;
+ flags|= LOG_EVENT_SUPPRESS_USE_F;
+ int2store(p + FLAGS_OFFSET, flags);
+
+ p[EVENT_TYPE_OFFSET]= QUERY_EVENT;
+ int4store(q + Q_THREAD_ID_OFFSET, 0);
+ int4store(q + Q_EXEC_TIME_OFFSET, 0);
+ q[Q_DB_LEN_OFFSET]= 0;
+ int2store(q + Q_ERR_CODE_OFFSET, 0);
+ int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0);
+ q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */
+ q+= Q_DATA_OFFSET + 1;
+ memcpy(q, "BEGIN", 5);
+
+ if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
+ {
+ ha_checksum crc= my_checksum(0L, p, data_len);
+ int4store(p + data_len, crc);
+ }
+ return 0;
+}
+
#ifdef MYSQL_CLIENT
/**
@@ -3696,6 +3754,8 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
LEX_STRING new_db;
int expected_error,actual_error= 0;
HA_CREATE_INFO db_options;
+ uint64 sub_id= 0;
+ rpl_gtid gtid;
DBUG_ENTER("Query_log_event::do_apply_event");
/*
@@ -3883,6 +3943,30 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
else
thd->variables.collation_database= thd->db_charset;
+ /*
+ Record any GTID in the same transaction, so slave state is
+ transactionally consistent.
+ */
+ if (strcmp("COMMIT", query) == 0 && (sub_id= rli->gtid_sub_id))
+ {
+ /* Clear the GTID from the RLI so we don't accidentally reuse it. */
+ const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0;
+
+ gtid= rli->current_gtid;
+ if (rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true))
+ {
+ rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
+ "Error during COMMIT: failed to update GTID state in "
+ "%s.%s: %d: %s",
+ "mysql", rpl_gtid_slave_state_table_name.str,
+ thd->stmt_da->sql_errno(), thd->stmt_da->message());
+ trans_rollback(thd);
+ sub_id= 0;
+ thd->is_slave_error= 1;
+ goto end;
+ }
+ }
+
thd->table_map_for_update= (table_map)table_map_for_update;
thd->set_invoker(&user, &host);
/*
@@ -4068,6 +4152,9 @@ Default database: '%s'. Query: '%s'",
}
end:
+ if (sub_id && !thd->is_slave_error)
+ rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid);
+
/*
Probably we have set thd->query, thd->db, thd->catalog to point to places
in the data_buf of this event. Now the event is going to be deleted
@@ -4145,6 +4232,17 @@ Query_log_event::do_shall_skip(Relay_log_info *rli)
DBUG_RETURN(Log_event::do_shall_skip(rli));
}
+
+bool
+Query_log_event::peek_is_commit_rollback(const char *event_start,
+ size_t event_len)
+{
+ if (event_len < LOG_EVENT_HEADER_LEN + QUERY_HEADER_LEN || event_len < 9)
+ return false;
+ return !memcmp(event_start + (event_len-7), "\0COMMIT", 7) ||
+ !memcmp(event_start + (event_len-9), "\0ROLLBACK", 9);
+}
+
#endif
@@ -4459,6 +4557,8 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
post_header_len[ANNOTATE_ROWS_EVENT-1]= ANNOTATE_ROWS_HEADER_LEN;
post_header_len[BINLOG_CHECKPOINT_EVENT-1]=
BINLOG_CHECKPOINT_HEADER_LEN;
+ post_header_len[GTID_EVENT-1]= GTID_HEADER_LEN;
+ post_header_len[GTID_LIST_EVENT-1]= GTID_LIST_HEADER_LEN;
// Sanity-check that all post header lengths are initialized.
int i;
@@ -4663,7 +4763,7 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
perform, we don't call Start_log_event_v3::do_apply_event()
(this was just to update the log's description event).
*/
- if (server_id != (uint32) ::server_id)
+ if (server_id != (uint32) global_system_variables.server_id)
{
/*
If the event was not requested by the slave i.e. the master sent
@@ -4689,7 +4789,7 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
int Format_description_log_event::do_update_pos(Relay_log_info *rli)
{
- if (server_id == (uint32) ::server_id)
+ if (server_id == (uint32) global_system_variables.server_id)
{
/*
We only increase the relay log position if we are skipping
@@ -5744,7 +5844,7 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli)
#endif
DBUG_PRINT("info", ("server_id=%lu; ::server_id=%lu",
- (ulong) this->server_id, (ulong) ::server_id));
+ (ulong) this->server_id, (ulong) global_system_variables.server_id));
DBUG_PRINT("info", ("new_log_ident: %s", this->new_log_ident));
DBUG_PRINT("info", ("pos: %s", llstr(this->pos, buf)));
@@ -5764,7 +5864,8 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli)
5.0.0, there also are some rotates from the slave itself, in the
relay log, which shall not change the group positions.
*/
- if ((server_id != ::server_id || rli->replicate_same_server_id) &&
+ if ((server_id != global_system_variables.server_id ||
+ rli->replicate_same_server_id) &&
!is_relay_log_event() &&
!rli->is_in_group())
{
@@ -5781,6 +5882,7 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli)
rli->group_master_log_name,
(ulong) rli->group_master_log_pos));
mysql_mutex_unlock(&rli->data_lock);
+ rpl_global_gtid_slave_state.record_and_update_gtid(thd, rli);
flush_relay_log_info(rli);
/*
@@ -5905,6 +6007,394 @@ bool Binlog_checkpoint_log_event::write(IO_CACHE *file)
/**************************************************************************
+ Global transaction ID stuff
+**************************************************************************/
+
+Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *description_event)
+ : Log_event(buf, description_event), seq_no(0)
+{
+ uint8 header_size= description_event->common_header_len;
+ uint8 post_header_len= description_event->post_header_len[GTID_EVENT-1];
+ if (event_len < header_size + post_header_len ||
+ post_header_len < GTID_HEADER_LEN)
+ return;
+
+ buf+= header_size;
+ seq_no= uint8korr(buf);
+ buf+= 8;
+ domain_id= uint4korr(buf);
+ buf+= 4;
+ flags2= *buf;
+}
+
+
+#ifdef MYSQL_SERVER
+
+Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
+ uint32 domain_id_arg, bool standalone,
+ uint16 flags_arg, bool is_transactional)
+ : Log_event(thd_arg, flags_arg, is_transactional),
+ seq_no(seq_no_arg), domain_id(domain_id_arg),
+ flags2(standalone ? FL_STANDALONE : 0)
+{
+ cache_type= Log_event::EVENT_NO_CACHE;
+}
+
+
+/*
+ Used to record GTID while sending binlog to slave, without having to
+ fully contruct every Gtid_log_event() needlessly.
+*/
+bool
+Gtid_log_event::peek(const char *event_start, size_t event_len,
+ uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
+ uchar *flags2)
+{
+ const char *p;
+ if (event_len < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
+ return true;
+ *server_id= uint4korr(event_start + SERVER_ID_OFFSET);
+ p= event_start + LOG_EVENT_HEADER_LEN;
+ *seq_no= uint8korr(p);
+ p+= 8;
+ *domain_id= uint4korr(p);
+ p+= 4;
+ *flags2= (uchar)*p;
+ return false;
+}
+
+
+bool
+Gtid_log_event::write(IO_CACHE *file)
+{
+ uchar buf[GTID_HEADER_LEN];
+ int8store(buf, seq_no);
+ int4store(buf+8, domain_id);
+ buf[12]= flags2;
+ bzero(buf+13, GTID_HEADER_LEN-13);
+ return write_header(file, GTID_HEADER_LEN) ||
+ wrapper_my_b_safe_write(file, buf, GTID_HEADER_LEN) ||
+ write_footer(file);
+}
+
+
+/*
+ Replace a GTID event with either a BEGIN event, dummy event, or nothing, as
+ appropriate to work with old slave that does not know global transaction id.
+
+ The need_dummy_event argument is an IN/OUT argument. It is passed as TRUE
+ if slave has capability lower than MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES.
+ It is returned TRUE if we return a BEGIN (or dummy) event to be sent to the
+ slave, FALSE if event should be skipped completely.
+*/
+int
+Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event,
+ ulong ev_offset, uint8 checksum_alg)
+{
+ uchar flags2;
+ if (packet->length() - ev_offset < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
+ return 1;
+ flags2= (*packet)[ev_offset + LOG_EVENT_HEADER_LEN + 12];
+ if (flags2 & FL_STANDALONE)
+ {
+ if (need_dummy_event)
+ return Query_log_event::dummy_event(packet, ev_offset, checksum_alg);
+ else
+ return 0;
+ }
+
+ *need_dummy_event= true;
+ return Query_log_event::begin_event(packet, ev_offset, checksum_alg);
+}
+
+
+#ifdef HAVE_REPLICATION
+void
+Gtid_log_event::pack_info(THD *thd, Protocol *protocol)
+{
+ char buf[6+5+10+1+10+1+20+1];
+ char *p;
+ p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " : "BEGIN GTID "));
+ p= longlong10_to_str(domain_id, p, 10);
+ *p++= '-';
+ p= longlong10_to_str(server_id, p, 10);
+ *p++= '-';
+ p= longlong10_to_str(seq_no, p, 10);
+
+ protocol->store(buf, p-buf, &my_charset_bin);
+}
+
+static char gtid_begin_string[] = "BEGIN";
+
+int
+Gtid_log_event::do_apply_event(Relay_log_info const *rli)
+{
+ thd->variables.server_id= this->server_id;
+ thd->variables.gtid_domain_id= this->domain_id;
+ thd->variables.gtid_seq_no= this->seq_no;
+
+ if (flags2 & FL_STANDALONE)
+ return 0;
+
+ /* Execute this like a BEGIN query event. */
+ thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string)-1,
+ &my_charset_bin, next_query_id());
+ Parser_state parser_state;
+ if (!parser_state.init(thd, thd->query(), thd->query_length()))
+ {
+ mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
+ /* Finalize server status flags after executing a statement. */
+ thd->update_server_status();
+ log_slow_statement(thd);
+ if (unlikely(thd->is_fatal_error))
+ thd->is_slave_error= 1;
+ else if (likely(!thd->is_slave_error))
+ general_log_write(thd, COM_QUERY, thd->query(), thd->query_length());
+ }
+
+ thd->reset_query();
+ free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
+ return thd->is_slave_error;
+}
+
+
+int
+Gtid_log_event::do_update_pos(Relay_log_info *rli)
+{
+ rli->inc_event_relay_log_pos();
+ return 0;
+}
+
+
+Log_event::enum_skip_reason
+Gtid_log_event::do_shall_skip(Relay_log_info *rli)
+{
+ /*
+ An event skipped due to @@skip_replication must not be counted towards the
+ number of events to be skipped due to @@sql_slave_skip_counter.
+ */
+ if (flags & LOG_EVENT_SKIP_REPLICATION_F &&
+ opt_replicate_events_marked_for_skip != RPL_SKIP_REPLICATE)
+ return Log_event::EVENT_SKIP_IGNORE;
+
+ if (rli->slave_skip_counter > 0)
+ {
+ if (!(flags2 & FL_STANDALONE))
+ thd->variables.option_bits|= OPTION_BEGIN;
+ return Log_event::continue_group(rli);
+ }
+ return Log_event::do_shall_skip(rli);
+}
+
+
+#endif /* HAVE_REPLICATION */
+
+#else /* !MYSQL_SERVER */
+
+void
+Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
+{
+ Write_on_release_cache cache(&print_event_info->head_cache, file,
+ Write_on_release_cache::FLUSH_F);
+ char buf[21];
+
+ if (!print_event_info->short_form)
+ {
+ print_header(&cache, print_event_info, FALSE);
+ longlong10_to_str(seq_no, buf, 10);
+ my_b_printf(&cache, "\tGTID %u-%u-%s\n", domain_id, server_id, buf);
+
+ if (!print_event_info->domain_id_printed ||
+ print_event_info->domain_id != domain_id)
+ {
+ my_b_printf(&cache, "/*!100001 SET @@session.gtid_domain_id=%u*/%s\n",
+ domain_id, print_event_info->delimiter);
+ print_event_info->domain_id= domain_id;
+ print_event_info->domain_id_printed= true;
+ }
+
+ if (!print_event_info->server_id_printed ||
+ print_event_info->server_id != server_id)
+ {
+ my_b_printf(&cache, "/*!100001 SET @@session.server_id=%u*/%s\n",
+ server_id, print_event_info->delimiter);
+ print_event_info->server_id= server_id;
+ print_event_info->server_id_printed= true;
+ }
+
+ my_b_printf(&cache, "/*!100001 SET @@session.gtid_seq_no=%s*/%s\n",
+ buf, print_event_info->delimiter);
+ }
+ if (!(flags2 & FL_STANDALONE))
+ my_b_printf(&cache, "BEGIN\n%s\n", print_event_info->delimiter);
+}
+
+#endif /* MYSQL_SERVER */
+
+
+/* GTID list. */
+
+Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *description_event)
+ : Log_event(buf, description_event), count(0), list(0)
+{
+ uint32 i;
+ uint8 header_size= description_event->common_header_len;
+ uint8 post_header_len= description_event->post_header_len[GTID_LIST_EVENT-1];
+ if (event_len < header_size + post_header_len ||
+ post_header_len < GTID_LIST_HEADER_LEN)
+ return;
+
+ buf+= header_size;
+ count= uint4korr(buf) & ((1<<28)-1);
+ buf+= 4;
+ if (event_len - (header_size + post_header_len) < count*element_size ||
+ (!(list= (rpl_gtid *)my_malloc(count*sizeof(*list) + (count == 0),
+ MYF(MY_WME)))))
+ return;
+
+ for (i= 0; i < count; ++i)
+ {
+ list[i].domain_id= uint4korr(buf);
+ buf+= 4;
+ list[i].server_id= uint4korr(buf);
+ buf+= 4;
+ list[i].seq_no= uint8korr(buf);
+ buf+= 8;
+ }
+}
+
+
+#ifdef MYSQL_SERVER
+
+Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set)
+ : count(gtid_set->count()), list(0)
+{
+ cache_type= EVENT_NO_CACHE;
+ /* Failure to allocate memory will be caught by is_valid() returning false. */
+ if (count < (1<<28) &&
+ (list = (rpl_gtid *)my_malloc(count * sizeof(*list) + (count == 0),
+ MYF(MY_WME))))
+ gtid_set->get_gtid_list(list, count);
+}
+
+bool
+Gtid_list_log_event::write(IO_CACHE *file)
+{
+ uint32 i;
+ uchar buf[element_size];
+
+ DBUG_ASSERT(count < 1<<28);
+
+ if (write_header(file, get_data_size()))
+ return 1;
+ int4store(buf, count & ((1<<28)-1));
+ if (wrapper_my_b_safe_write(file, buf, GTID_LIST_HEADER_LEN))
+ return 1;
+ for (i= 0; i < count; ++i)
+ {
+ int4store(buf, list[i].domain_id);
+ int4store(buf+4, list[i].server_id);
+ int8store(buf+8, list[i].seq_no);
+ if (wrapper_my_b_safe_write(file, buf, element_size))
+ return 1;
+ }
+ return write_footer(file);
+}
+
+
+#ifdef HAVE_REPLICATION
+void
+Gtid_list_log_event::pack_info(THD *thd, Protocol *protocol)
+{
+ char buf_mem[1024];
+ String buf(buf_mem, sizeof(buf_mem), system_charset_info);
+ uint32 i;
+ bool first;
+
+ buf.length(0);
+ buf.append(STRING_WITH_LEN("["));
+ first= true;
+ for (i= 0; i < count; ++i)
+ rpl_slave_state_tostring_helper(&buf, &list[i], &first);
+ buf.append(STRING_WITH_LEN("]"));
+
+ protocol->store(&buf);
+}
+#endif /* HAVE_REPLICATION */
+
+#else /* !MYSQL_SERVER */
+
+void
+Gtid_list_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
+{
+ if (!print_event_info->short_form)
+ {
+ Write_on_release_cache cache(&print_event_info->head_cache, file,
+ Write_on_release_cache::FLUSH_F);
+ char buf[21];
+ uint32 i;
+
+ print_header(&cache, print_event_info, FALSE);
+ my_b_printf(&cache, "\tGtid list [");
+ for (i= 0; i < count; ++i)
+ {
+ longlong10_to_str(list[i].seq_no, buf, 10);
+ my_b_printf(&cache, "%u-%u-%s", list[i].domain_id,
+ list[i].server_id, buf);
+ if (i < count-1)
+ my_b_printf(&cache, ",\n# ");
+ }
+ my_b_printf(&cache, "]\n");
+ }
+}
+
+#endif /* MYSQL_SERVER */
+
+
+/*
+ Used to record gtid_list event while sending binlog to slave, without having to
+ fully contruct the event object.
+*/
+bool
+Gtid_list_log_event::peek(const char *event_start, uint32 event_len,
+ rpl_gtid **out_gtid_list, uint32 *out_list_len)
+{
+ const char *p;
+ uint32 count_field, count;
+ rpl_gtid *gtid_list;
+
+ if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN)
+ return true;
+ p= event_start + LOG_EVENT_HEADER_LEN;
+ count_field= uint4korr(p);
+ p+= 4;
+ count= count_field & ((1<<28)-1);
+ if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN +
+ 16 * count)
+ return true;
+ if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(rpl_gtid)*count + (count == 0),
+ MYF(MY_WME))))
+ return true;
+ *out_gtid_list= gtid_list;
+ *out_list_len= count;
+ while (count--)
+ {
+ gtid_list->domain_id= uint4korr(p);
+ p+= 4;
+ gtid_list->server_id= uint4korr(p);
+ p+= 4;
+ gtid_list->seq_no= uint8korr(p);
+ p+= 8;
+ ++gtid_list;
+ }
+
+ return false;
+}
+
+
+/**************************************************************************
Intvar_log_event methods
**************************************************************************/
@@ -6257,12 +6747,43 @@ void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
int Xid_log_event::do_apply_event(Relay_log_info const *rli)
{
bool res;
+ int err;
+ rpl_gtid gtid;
+ uint64 sub_id;
+
+ /*
+ Record any GTID in the same transaction, so slave state is transactionally
+ consistent.
+ */
+ if ((sub_id= rli->gtid_sub_id))
+ {
+ /* Clear the GTID from the RLI so we don't accidentally reuse it. */
+ const_cast<Relay_log_info*>(rli)->gtid_sub_id= 0;
+
+ gtid= rli->current_gtid;
+ err= rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true);
+ if (err)
+ {
+ rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
+ "Error during XID COMMIT: failed to update GTID state in "
+ "%s.%s: %d: %s",
+ "mysql", rpl_gtid_slave_state_table_name.str,
+ thd->stmt_da->sql_errno(), thd->stmt_da->message());
+ trans_rollback(thd);
+ thd->is_slave_error= 1;
+ return err;
+ }
+ }
+
/* For a slave Xid_log_event is COMMIT */
general_log_print(thd, COM_QUERY,
"COMMIT /* implicit, from Xid_log_event */");
res= trans_commit(thd); /* Automatically rolls back on error. */
thd->mdl_context.release_transactional_locks();
+ if (sub_id)
+ rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid);
+
/*
Increment the global status commit count variable
*/
@@ -7009,6 +7530,7 @@ int Stop_log_event::do_update_pos(Relay_log_info *rli)
rli->inc_event_relay_log_pos();
else
{
+ rpl_global_gtid_slave_state.record_and_update_gtid(thd, rli);
rli->inc_group_relay_log_pos(0);
flush_relay_log_info(rli);
}
@@ -8806,7 +9328,7 @@ Rows_log_event::do_update_pos(Relay_log_info *rli)
Step the group log position if we are not in a transaction,
otherwise increase the event log position.
*/
- rli->stmt_done(log_pos, when);
+ rli->stmt_done(log_pos, when, thd);
/*
Clear any errors in thd->net.last_err*. It is not known if this is
needed or not. It is believed that any errors that may exist in
@@ -11150,7 +11672,9 @@ st_print_event_info::st_print_event_info()
auto_increment_increment(0),auto_increment_offset(0), charset_inited(0),
lc_time_names_number(~0),
charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER),
- thread_id(0), thread_id_printed(false), skip_replication(0),
+ thread_id(0), thread_id_printed(false), server_id(0),
+ server_id_printed(false), domain_id(0), domain_id_printed(false),
+ skip_replication(0),
base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE)
{
/*
diff --git a/sql/log_event.h b/sql/log_event.h
index be63304b529..1c2b2769915 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -50,6 +50,8 @@
#include "sql_class.h" /* THD */
#endif
+#include "rpl_gtid.h"
+
/* Forward declarations */
class String;
@@ -261,6 +263,8 @@ struct sql_ex_info
#define HEARTBEAT_HEADER_LEN 0
#define ANNOTATE_ROWS_HEADER_LEN 0
#define BINLOG_CHECKPOINT_HEADER_LEN 4
+#define GTID_HEADER_LEN 19
+#define GTID_LIST_HEADER_LEN 4
/*
Max number of possible extra bytes in a replication event compared to a
@@ -600,16 +604,13 @@ enum enum_binlog_checksum_alg {
because they mis-compute the offsets into the master's binlog).
*/
#define MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES 2
-/* MariaDB > 5.5, which knows about binlog_checkpoint_log_event. */
+/* MariaDB >= 10.0, which knows about binlog_checkpoint_log_event. */
#define MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT 3
-/*
- MariaDB server which understands MySQL 5.6 ignorable events. This server
- can tolerate receiving any event with the LOG_EVENT_IGNORABLE_F flag set.
-*/
-#define MARIA_SLAVE_CAPABILITY_IGNORABLE 4
+/* MariaDB >= 10.0.1, which knows about global transaction id events. */
+#define MARIA_SLAVE_CAPABILITY_GTID 4
/* Our capability. */
-#define MARIA_SLAVE_CAPABILITY_MINE MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT
+#define MARIA_SLAVE_CAPABILITY_MINE MARIA_SLAVE_CAPABILITY_GTID
/**
@@ -695,6 +696,18 @@ enum Log_event_type
that are prepared in storage engines but not yet committed.
*/
BINLOG_CHECKPOINT_EVENT= 161,
+ /*
+ Gtid event. For global transaction ID, used to start a new event group,
+ instead of the old BEGIN query event, and also to mark stand-alone
+ events.
+ */
+ GTID_EVENT= 162,
+ /*
+ Gtid list event. Logged at the start of every binlog, to record the
+ current replication state. This consists of the last GTID seen for
+ each replication domain.
+ */
+ GTID_LIST_EVENT= 163,
/* Add new MariaDB events here - right above this comment! */
@@ -767,6 +780,11 @@ typedef struct st_print_event_info
uint charset_database_number;
uint thread_id;
bool thread_id_printed;
+ uint32 server_id;
+ bool server_id_printed;
+ uint32 domain_id;
+ bool domain_id_printed;
+
/*
Track when @@skip_replication changes so we need to output a SET
statement for it.
@@ -1302,6 +1320,35 @@ public:
return do_shall_skip(rli);
}
+
+ /*
+ Check if an event is non-final part of a stand-alone event group,
+ such as Intvar_log_event (such events should be processed as part
+ of the following event group, not individually).
+ */
+ static bool is_part_of_group(enum Log_event_type ev_type)
+ {
+ switch (ev_type)
+ {
+ case GTID_EVENT:
+ case INTVAR_EVENT:
+ case RAND_EVENT:
+ case USER_VAR_EVENT:
+ case TABLE_MAP_EVENT:
+ case ANNOTATE_ROWS_EVENT:
+ return true;
+ case DELETE_ROWS_EVENT:
+ case UPDATE_ROWS_EVENT:
+ case WRITE_ROWS_EVENT:
+ /*
+ ToDo: also check for non-final Rows_log_event (though such events
+ are usually in a BEGIN-COMMIT group).
+ */
+ default:
+ return false;
+ }
+ }
+
protected:
/**
@@ -1875,6 +1922,7 @@ public:
}
Log_event_type get_type_code() { return QUERY_EVENT; }
static int dummy_event(String *packet, ulong ev_offset, uint8 checksum_alg);
+ static int begin_event(String *packet, ulong ev_offset, uint8 checksum_alg);
#ifdef MYSQL_SERVER
bool write(IO_CACHE* file);
virtual bool write_post_header_for_derived(IO_CACHE* file) { return FALSE; }
@@ -1897,6 +1945,8 @@ public: /* !!! Public in this patch to allow old usage */
int do_apply_event(Relay_log_info const *rli,
const char *query_arg,
uint32 q_len_arg);
+ static bool peek_is_commit_rollback(const char *event_start,
+ size_t event_len);
#endif /* HAVE_REPLICATION */
/*
If true, the event always be applied by slave SQL thread or be printed by
@@ -2410,7 +2460,7 @@ protected:
Events from ourself should be skipped, but they should not
decrease the slave skip counter.
*/
- if (this->server_id == ::server_id)
+ if (this->server_id == global_system_variables.server_id)
return Log_event::EVENT_SKIP_IGNORE;
else
return Log_event::EVENT_SKIP_NOT;
@@ -2815,7 +2865,7 @@ private:
Events from ourself should be skipped, but they should not
decrease the slave skip counter.
*/
- if (this->server_id == ::server_id)
+ if (this->server_id == global_system_variables.server_id)
return Log_event::EVENT_SKIP_IGNORE;
else
return Log_event::EVENT_SKIP_NOT;
@@ -2942,6 +2992,194 @@ public:
#endif
};
+
+/**
+ @class Gtid_log_event
+
+ This event is logged as part of every event group to give the global
+ transaction id (GTID) of that group.
+
+ It replaces the BEGIN query event used in earlier versions to begin most
+ event groups, but is also used for events that used to be stand-alone.
+
+ @section Gtid_log_event_binary_format Binary Format
+
+ The binary format for Gtid_log_event has 6 extra reserved bytes to make the
+ length a total of 19 byte (+ 19 bytes of header in common with all events).
+ This is just the minimal size for a BEGIN query event, which makes it easy
+ to replace this event with such BEGIN event to remain compatible with old
+ slave servers.
+
+ <table>
+ <caption>Post-Header</caption>
+
+ <tr>
+ <th>Name</th>
+ <th>Format</th>
+ <th>Description</th>
+ </tr>
+
+ <tr>
+ <td>seq_no</td>
+ <td>8 byte unsigned integer</td>
+ <td>increasing id within one server_id. Starts at 1, holes in the sequence
+ may occur</td>
+ </tr>
+
+ <tr>
+ <td>domain_id</td>
+ <td>4 byte unsigned integer</td>
+ <td>Replication domain id, identifying independent replication streams></td>
+ </tr>
+
+ <tr>
+ <td>flags</td>
+ <td>1 byte bitfield</td>
+ <td>Bit 0 set indicates stand-alone event (no terminating COMMIT)</td>
+ </tr>
+
+ <tr>
+ <td>Reserved</td>
+ <td>6 bytes</td>
+ <td>Reserved bytes, set to 0. Maybe be used for future expansion.</td>
+ </tr>
+ </table>
+
+ The Body of Gtid_log_event is empty. The total event size is 19 bytes +
+ the normal 19 bytes common-header.
+*/
+
+class Gtid_log_event: public Log_event
+{
+public:
+ uint64 seq_no;
+ uint32 domain_id;
+ uchar flags2;
+
+ /* Flags2. */
+
+ /* FL_STANDALONE is set when there is no terminating COMMIT event. */
+ static const uchar FL_STANDALONE= 1;
+
+#ifdef MYSQL_SERVER
+ Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone,
+ uint16 flags, bool is_transactional);
+#ifdef HAVE_REPLICATION
+ void pack_info(THD *thd, Protocol *protocol);
+ virtual int do_apply_event(Relay_log_info const *rli);
+ virtual int do_update_pos(Relay_log_info *rli);
+ virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
+#endif
+#else
+ void print(FILE *file, PRINT_EVENT_INFO *print_event_info);
+#endif
+ Gtid_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *description_event);
+ ~Gtid_log_event() { }
+ Log_event_type get_type_code() { return GTID_EVENT; }
+ int get_data_size() { return GTID_HEADER_LEN; }
+ bool is_valid() const { return seq_no != 0; }
+#ifdef MYSQL_SERVER
+ bool write(IO_CACHE *file);
+ static int make_compatible_event(String *packet, bool *need_dummy_event,
+ ulong ev_offset, uint8 checksum_alg);
+ static bool peek(const char *event_start, size_t event_len,
+ uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
+ uchar *flags2);
+#endif
+};
+
+
+/**
+ @class Gtid_list_log_event
+
+ This event is logged at the start of every binlog file to record the
+ current replication state: the last global transaction id (GTID) applied
+ on the server within each replication domain.
+
+ It consists of a list of GTIDs, one for each replication domain ever seen
+ on the server.
+
+ @section Gtid_list_log_event_binary_format Binary Format
+
+ <table>
+ <caption>Post-Header</caption>
+
+ <tr>
+ <th>Name</th>
+ <th>Format</th>
+ <th>Description</th>
+ </tr>
+
+ <tr>
+ <td>count</td>
+ <td>4 byte unsigned integer</td>
+ <td>The lower 28 bits are the number of GTIDs. The upper 4 bits are
+ reserved for flags bits for future expansion</td>
+ </tr>
+ </table>
+
+ <table>
+ <caption>Body</caption>
+
+ <tr>
+ <th>Name</th>
+ <th>Format</th>
+ <th>Description</th>
+ </tr>
+
+ <tr>
+ <td>domain_id</td>
+ <td>4 byte unsigned integer</td>
+ <td>Replication domain id of one GTID</td>
+ </tr>
+
+ <tr>
+ <td>server_id</td>
+ <td>4 byte unsigned integer</td>
+ <td>Server id of one GTID</td>
+ </tr>
+
+ <tr>
+ <td>seq_no</td>
+ <td>8 byte unsigned integer</td>
+ <td>sequence number of one GTID</td>
+ </tr>
+ </table>
+
+ The three elements in the body repeat COUNT times to form the GTID list.
+*/
+
+class Gtid_list_log_event: public Log_event
+{
+public:
+ uint32 count;
+ struct rpl_gtid *list;
+
+ static const uint element_size= 4+4+8;
+
+#ifdef MYSQL_SERVER
+ Gtid_list_log_event(rpl_binlog_state *gtid_set);
+#ifdef HAVE_REPLICATION
+ void pack_info(THD *thd, Protocol *protocol);
+#endif
+#else
+ void print(FILE *file, PRINT_EVENT_INFO *print_event_info);
+#endif
+ Gtid_list_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *description_event);
+ ~Gtid_list_log_event() { my_free(list); }
+ Log_event_type get_type_code() { return GTID_LIST_EVENT; }
+ int get_data_size() { return GTID_LIST_HEADER_LEN + count*element_size; }
+ bool is_valid() const { return list != NULL; }
+#ifdef MYSQL_SERVER
+ bool write(IO_CACHE *file);
+#endif
+ static bool peek(const char *event_start, uint32 event_len,
+ rpl_gtid **out_gtid_list, uint32 *out_list_len);
+};
+
+
/* the classes below are for the new LOAD DATA INFILE logging */
/**
diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc
index e9afe474418..698118e3bda 100644
--- a/sql/log_event_old.cc
+++ b/sql/log_event_old.cc
@@ -1847,7 +1847,7 @@ Old_rows_log_event::do_update_pos(Relay_log_info *rli)
Step the group log position if we are not in a transaction,
otherwise increase the event log position.
*/
- rli->stmt_done(log_pos, when);
+ rli->stmt_done(log_pos, when, thd);
/*
Clear any errors in thd->net.last_err*. It is not known if this is
needed or not. It is believed that any errors that may exist in
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 35f57bebd02..80b6428fca2 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -680,6 +680,8 @@ mysql_mutex_t
mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats,
LOCK_global_table_stats, LOCK_global_index_stats;
+mysql_mutex_t LOCK_gtid_counter, LOCK_rpl_gtid_state;
+
/**
The below lock protects access to two global server variables:
max_prepared_stmt_count and prepared_stmt_count. These variables
@@ -840,12 +842,15 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_thread_count, key_LOCK_thread_cache,
key_PARTITION_LOCK_auto_inc;
PSI_mutex_key key_RELAYLOG_LOCK_index;
+PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats,
key_LOCK_wakeup_ready;
+PSI_mutex_key key_LOCK_gtid_counter, key_LOCK_rpl_gtid_state;
+
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
static PSI_mutex_info all_server_mutexes[]=
@@ -889,6 +894,8 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_global_table_stats, "LOCK_global_table_stats", PSI_FLAG_GLOBAL},
{ &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL},
{ &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0},
+ { &key_LOCK_gtid_counter, "LOCK_gtid_counter", PSI_FLAG_GLOBAL},
+ { &key_LOCK_rpl_gtid_state, "LOCK_rpl_gtid_state", PSI_FLAG_GLOBAL},
{ &key_LOCK_thd_data, "THD::LOCK_thd_data", 0},
{ &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL},
{ &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL},
@@ -909,7 +916,9 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOG_INFO_lock, "LOG_INFO::lock", 0},
{ &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL},
{ &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL},
- { &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0}
+ { &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0},
+ { &key_LOCK_slave_state, "LOCK_slave_state", 0},
+ { &key_LOCK_binlog_state, "LOCK_binlog_state", 0}
};
PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
@@ -1036,6 +1045,7 @@ PSI_file_key key_file_binlog, key_file_binlog_index, key_file_casetest,
key_file_trg, key_file_trn, key_file_init;
PSI_file_key key_file_query_log, key_file_slow_log;
PSI_file_key key_file_relaylog, key_file_relaylog_index;
+PSI_file_key key_file_binlog_state;
#endif /* HAVE_PSI_INTERFACE */
@@ -1383,6 +1393,12 @@ struct st_VioSSLFd *ssl_acceptor_fd;
*/
uint connection_count= 0, extra_connection_count= 0;
+/**
+ Running counter for generating new GTIDs locally.
+*/
+uint64 global_gtid_counter= 0;
+
+
/* Function declarations */
pthread_handler_t signal_hand(void *arg);
@@ -1870,6 +1886,7 @@ static void mysqld_exit(int exit_code)
but if a kill -15 signal was sent, the signal thread did
spawn the kill_server_thread thread, which is running concurrently.
*/
+ rpl_deinit_gtid_slave_state();
wait_for_signal_thread_to_end();
mysql_audit_finalize();
clean_up_mutexes();
@@ -2042,6 +2059,8 @@ static void clean_up_mutexes()
mysql_mutex_destroy(&LOCK_global_user_client_stats);
mysql_mutex_destroy(&LOCK_global_table_stats);
mysql_mutex_destroy(&LOCK_global_index_stats);
+ mysql_mutex_destroy(&LOCK_gtid_counter);
+ mysql_mutex_destroy(&LOCK_rpl_gtid_state);
#ifdef HAVE_OPENSSL
mysql_mutex_destroy(&LOCK_des_key_file);
#ifndef HAVE_YASSL
@@ -4199,6 +4218,7 @@ static int init_thread_environment()
mysql_mutex_init(key_LOCK_active_mi, &LOCK_active_mi, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_global_system_variables,
&LOCK_global_system_variables, MY_MUTEX_INIT_FAST);
+ mysql_mutex_record_order(&LOCK_active_mi, &LOCK_global_system_variables);
mysql_rwlock_init(key_rwlock_LOCK_system_variables_hash,
&LOCK_system_variables_hash);
mysql_mutex_init(key_LOCK_prepared_stmt_count,
@@ -4216,6 +4236,10 @@ static int init_thread_environment()
&LOCK_global_table_stats, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_global_index_stats,
&LOCK_global_index_stats, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_gtid_counter,
+ &LOCK_gtid_counter, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_rpl_gtid_state,
+ &LOCK_rpl_gtid_state, MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_prepare_ordered, &LOCK_prepare_ordered,
MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered,
@@ -4259,6 +4283,10 @@ static int init_thread_environment()
PTHREAD_CREATE_DETACHED);
pthread_attr_setscope(&connection_attrib, PTHREAD_SCOPE_SYSTEM);
+#ifdef HAVE_REPLICATION
+ rpl_init_gtid_slave_state();
+#endif
+
DBUG_RETURN(0);
}
@@ -5145,9 +5173,9 @@ int mysqld_main(int argc, char **argv)
set_user(mysqld_user, user_info);
}
- if (opt_bin_log && !server_id)
+ if (opt_bin_log && !global_system_variables.server_id)
{
- server_id= 1;
+ global_system_variables.server_id= ::server_id= 1;
#ifdef EXTRA_DEBUG
sql_print_warning("You have enabled the binary log, but you haven't set "
"server-id to a non-zero value: we force server id to 1; "
@@ -6895,19 +6923,25 @@ static int show_rpl_status(THD *thd, SHOW_VAR *var, char *buff)
static int show_slave_running(THD *thd, SHOW_VAR *var, char *buff)
{
Master_info *mi;
+ bool tmp;
+ LINT_INIT(tmp);
+
var->type= SHOW_MY_BOOL;
var->value= buff;
+ mysql_mutex_unlock(&LOCK_status);
mysql_mutex_lock(&LOCK_active_mi);
mi= master_info_index->
get_master_info(&thd->variables.default_master_connection,
MYSQL_ERROR::WARN_LEVEL_NOTE);
if (mi)
- *((my_bool *)buff)= (my_bool) (mi->slave_running ==
- MYSQL_SLAVE_RUN_CONNECT &&
- mi->rli.slave_running);
+ tmp= (my_bool) (mi->slave_running == MYSQL_SLAVE_RUN_CONNECT &&
+ mi->rli.slave_running);
+ mysql_mutex_unlock(&LOCK_active_mi);
+ mysql_mutex_lock(&LOCK_status);
+ if (mi)
+ *((my_bool *)buff)= tmp;
else
var->type= SHOW_UNDEF;
- mysql_mutex_unlock(&LOCK_active_mi);
return 0;
}
@@ -6915,17 +6949,24 @@ static int show_slave_running(THD *thd, SHOW_VAR *var, char *buff)
static int show_slave_received_heartbeats(THD *thd, SHOW_VAR *var, char *buff)
{
Master_info *mi;
+ longlong tmp;
+ LINT_INIT(tmp);
+
var->type= SHOW_LONGLONG;
var->value= buff;
+ mysql_mutex_unlock(&LOCK_status);
mysql_mutex_lock(&LOCK_active_mi);
mi= master_info_index->
get_master_info(&thd->variables.default_master_connection,
MYSQL_ERROR::WARN_LEVEL_NOTE);
if (mi)
- *((longlong *)buff)= mi->received_heartbeats;
+ tmp= mi->received_heartbeats;
+ mysql_mutex_unlock(&LOCK_active_mi);
+ mysql_mutex_lock(&LOCK_status);
+ if (mi)
+ *((longlong *)buff)= tmp;
else
var->type= SHOW_UNDEF;
- mysql_mutex_unlock(&LOCK_active_mi);
return 0;
}
@@ -6933,17 +6974,24 @@ static int show_slave_received_heartbeats(THD *thd, SHOW_VAR *var, char *buff)
static int show_heartbeat_period(THD *thd, SHOW_VAR *var, char *buff)
{
Master_info *mi;
+ float tmp;
+ LINT_INIT(tmp);
+
var->type= SHOW_CHAR;
var->value= buff;
+ mysql_mutex_unlock(&LOCK_status);
mysql_mutex_lock(&LOCK_active_mi);
mi= master_info_index->
get_master_info(&thd->variables.default_master_connection,
MYSQL_ERROR::WARN_LEVEL_NOTE);
if (mi)
- sprintf(buff, "%.3f", mi->heartbeat_period);
+ tmp= mi->heartbeat_period;
+ mysql_mutex_unlock(&LOCK_active_mi);
+ mysql_mutex_lock(&LOCK_status);
+ if (mi)
+ sprintf(buff, "%.3f", tmp);
else
var->type= SHOW_UNDEF;
- mysql_mutex_unlock(&LOCK_active_mi);
return 0;
}
@@ -8142,6 +8190,7 @@ mysqld_get_one_option(int optid,
break;
case OPT_SERVER_ID:
server_id_supplied = 1;
+ ::server_id= global_system_variables.server_id;
break;
case OPT_ONE_THREAD:
thread_handling= SCHEDULER_NO_THREADS;
@@ -8930,7 +8979,8 @@ static PSI_file_info all_server_files[]=
{ &key_file_tclog, "tclog", 0},
{ &key_file_trg, "trigger_name", 0},
{ &key_file_trn, "trigger", 0},
- { &key_file_init, "init", 0}
+ { &key_file_init, "init", 0},
+ { &key_file_binlog_state, "binlog_state", 0}
};
#endif /* HAVE_PSI_INTERFACE */
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 7e4295365ce..0717bcff718 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -249,11 +249,14 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
extern PSI_mutex_key key_RELAYLOG_LOCK_index;
+extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
extern PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats, key_LOCK_wakeup_ready;
+extern PSI_mutex_key key_LOCK_gtid_counter, key_LOCK_rpl_gtid_state;
+
extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave,
key_rwlock_LOCK_system_variables_hash, key_rwlock_query_cache_query_lock;
@@ -295,6 +298,7 @@ extern PSI_file_key key_file_query_log, key_file_slow_log;
extern PSI_file_key key_file_relaylog, key_file_relaylog_index;
extern PSI_socket_key key_socket_tcpip, key_socket_unix,
key_socket_client_connection;
+extern PSI_file_key key_file_binlog_state;
void init_server_psi_keys();
#endif /* HAVE_PSI_INTERFACE */
@@ -472,6 +476,7 @@ extern mysql_mutex_t
LOCK_slave_list, LOCK_active_mi, LOCK_manager,
LOCK_global_system_variables, LOCK_user_conn,
LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count;
+extern mysql_mutex_t LOCK_gtid_counter, LOCK_rpl_gtid_state;
extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count;
#ifdef HAVE_OPENSSL
extern mysql_mutex_t LOCK_des_key_file;
@@ -671,6 +676,7 @@ inline int set_current_thd(THD *thd)
extern handlerton *maria_hton;
extern uint extra_connection_count;
+extern uint64 global_gtid_counter;
extern my_bool opt_userstat_running, debug_assert_if_crashed_table;
extern uint mysqld_extra_port;
extern ulong opt_progress_report_time;
diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc
index 86f19f1b28e..334de1337d6 100644
--- a/sql/repl_failsafe.cc
+++ b/sql/repl_failsafe.cc
@@ -87,14 +87,15 @@ void change_rpl_status(ulong from_status, ulong to_status)
void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
{
- if (thd->server_id)
+ uint32 thd_server_id= thd->variables.server_id;
+ if (thd_server_id)
{
if (need_mutex)
mysql_mutex_lock(&LOCK_slave_list);
SLAVE_INFO* old_si;
if ((old_si = (SLAVE_INFO*)my_hash_search(&slave_list,
- (uchar*)&thd->server_id, 4)) &&
+ (uchar*)&thd_server_id, 4)) &&
(!only_mine || old_si->thd == thd))
my_hash_delete(&slave_list, (uchar*)old_si);
@@ -125,7 +126,7 @@ int register_slave(THD* thd, uchar* packet, uint packet_length)
if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
goto err2;
- thd->server_id= si->server_id= uint4korr(p);
+ thd->variables.server_id= si->server_id= uint4korr(p);
p+= 4;
get_object(p,si->host, "Failed to register slave: too long 'report-host'");
get_object(p,si->user, "Failed to register slave: too long 'report-user'");
@@ -143,7 +144,7 @@ int register_slave(THD* thd, uchar* packet, uint packet_length)
// si->rpl_recovery_rank= uint4korr(p);
p += 4;
if (!(si->master_id= uint4korr(p)))
- si->master_id= server_id;
+ si->master_id= global_system_variables.server_id;
si->thd= thd;
mysql_mutex_lock(&LOCK_slave_list);
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
new file mode 100644
index 00000000000..d6a6ed90bd3
--- /dev/null
+++ b/sql/rpl_gtid.cc
@@ -0,0 +1,1122 @@
+/* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+
+/* Definitions for MariaDB global transaction ID (GTID). */
+
+
+#include "sql_priv.h"
+#include "my_sys.h"
+#include "unireg.h"
+#include "my_global.h"
+#include "sql_base.h"
+#include "sql_parse.h"
+#include "key.h"
+#include "rpl_gtid.h"
+#include "rpl_rli.h"
+
+
+const LEX_STRING rpl_gtid_slave_state_table_name=
+ { C_STRING_WITH_LEN("rpl_slave_state") };
+
+
+void
+rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
+{
+ int err;
+ /*
+ Add the gtid to the HASH in the replication slave state.
+
+ We must do this only _after_ commit, so that for parallel replication,
+ there will not be an attempt to delete the corresponding table row before
+ it is even committed.
+ */
+ lock();
+ err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no);
+ unlock();
+ if (err)
+ {
+ sql_print_warning("Slave: Out of memory during slave state maintenance. "
+ "Some no longer necessary rows in table "
+ "mysql.%s may be left undeleted.",
+ rpl_gtid_slave_state_table_name.str);
+ /*
+ Such failure is not fatal. We will fail to delete the row for this
+ GTID, but it will do no harm and will be removed automatically on next
+ server restart.
+ */
+ }
+}
+
+
+int
+rpl_slave_state::record_and_update_gtid(THD *thd, Relay_log_info *rli)
+{
+ uint64 sub_id;
+
+ /*
+ Update the GTID position, if we have it and did not already update
+ it in a GTID transaction.
+ */
+ if ((sub_id= rli->gtid_sub_id))
+ {
+ rli->gtid_sub_id= 0;
+ if (record_gtid(thd, &rli->current_gtid, sub_id, false))
+ return 1;
+ update_state_hash(sub_id, &rli->current_gtid);
+ }
+ return 0;
+}
+
+
+rpl_slave_state::rpl_slave_state()
+ : inited(false), loaded(false)
+{
+ my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
+ sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+}
+
+
+rpl_slave_state::~rpl_slave_state()
+{
+}
+
+
+void
+rpl_slave_state::init()
+{
+ DBUG_ASSERT(!inited);
+ mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state, MY_MUTEX_INIT_SLOW);
+ inited= true;
+}
+
+
+void
+rpl_slave_state::truncate_hash()
+{
+ uint32 i;
+
+ for (i= 0; i < hash.records; ++i)
+ {
+ element *e= (element *)my_hash_element(&hash, i);
+ list_element *l= e->list;
+ list_element *next;
+ while (l)
+ {
+ next= l->next;
+ my_free(l);
+ l= next;
+ }
+ /* The element itself is freed by the hash element free function. */
+ }
+ my_hash_reset(&hash);
+}
+
+void
+rpl_slave_state::deinit()
+{
+ if (!inited)
+ return;
+ truncate_hash();
+ my_hash_free(&hash);
+ mysql_mutex_destroy(&LOCK_slave_state);
+}
+
+
+int
+rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
+ uint64 seq_no)
+{
+ element *elem= NULL;
+ list_element *list_elem= NULL;
+
+ if (!(elem= get_element(domain_id)))
+ return 1;
+
+ if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
+ return 1;
+ list_elem->server_id= server_id;
+ list_elem->sub_id= sub_id;
+ list_elem->seq_no= seq_no;
+
+ elem->add(list_elem);
+ return 0;
+}
+
+
+struct rpl_slave_state::element *
+rpl_slave_state::get_element(uint32 domain_id)
+{
+ struct element *elem;
+
+ elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
+ if (elem)
+ return elem;
+
+ if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
+ return NULL;
+ elem->list= NULL;
+ elem->last_sub_id= 0;
+ elem->domain_id= domain_id;
+ if (my_hash_insert(&hash, (uchar *)elem))
+ {
+ my_free(elem);
+ return NULL;
+ }
+ return elem;
+}
+
+
+int
+rpl_slave_state::truncate_state_table(THD *thd)
+{
+ TABLE_LIST tlist;
+ int err= 0;
+ TABLE *table;
+
+ mysql_reset_thd_for_next_command(thd, 0);
+
+ tlist.init_one_table(STRING_WITH_LEN("mysql"),
+ rpl_gtid_slave_state_table_name.str,
+ rpl_gtid_slave_state_table_name.length,
+ NULL, TL_WRITE);
+ if (!(err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
+ {
+ table= tlist.table;
+ table->no_replicate= 1;
+ err= table->file->ha_truncate();
+
+ if (err)
+ {
+ ha_rollback_trans(thd, FALSE);
+ close_thread_tables(thd);
+ ha_rollback_trans(thd, TRUE);
+ }
+ else
+ {
+ ha_commit_trans(thd, FALSE);
+ close_thread_tables(thd);
+ ha_commit_trans(thd, TRUE);
+ }
+ thd->mdl_context.release_transactional_locks();
+ }
+
+ return err;
+}
+
+
+static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= {
+ { { C_STRING_WITH_LEN("domain_id") },
+ { C_STRING_WITH_LEN("int(10) unsigned") },
+ {NULL, 0} },
+ { { C_STRING_WITH_LEN("sub_id") },
+ { C_STRING_WITH_LEN("bigint(20) unsigned") },
+ {NULL, 0} },
+ { { C_STRING_WITH_LEN("server_id") },
+ { C_STRING_WITH_LEN("int(10) unsigned") },
+ {NULL, 0} },
+ { { C_STRING_WITH_LEN("seq_no") },
+ { C_STRING_WITH_LEN("bigint(20) unsigned") },
+ {NULL, 0} },
+};
+
+static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1};
+
+static const TABLE_FIELD_DEF mysql_rpl_slave_state_tabledef= {
+ array_elements(mysql_rpl_slave_state_coltypes),
+ mysql_rpl_slave_state_coltypes,
+ array_elements(mysql_rpl_slave_state_pk_parts),
+ mysql_rpl_slave_state_pk_parts
+};
+
+class Gtid_db_intact : public Table_check_intact
+{
+protected:
+ void report_error(uint, const char *fmt, ...)
+ {
+ va_list args;
+ va_start(args, fmt);
+ error_log_print(ERROR_LEVEL, fmt, args);
+ va_end(args);
+ }
+};
+
+static Gtid_db_intact gtid_table_intact;
+
+/*
+ Check that the mysql.rpl_slave_state table has the correct definition.
+*/
+int
+gtid_check_rpl_slave_state_table(TABLE *table)
+{
+ int err;
+
+ if ((err= gtid_table_intact.check(table, &mysql_rpl_slave_state_tabledef)))
+ my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql",
+ rpl_gtid_slave_state_table_name.str);
+ return err;
+}
+
+
+/*
+ Write a gtid to the replication slave state table.
+
+ Do it as part of the transaction, to get slave crash safety, or as a separate
+ transaction if !in_transaction (eg. MyISAM or DDL).
+
+ gtid The global transaction id for this event group.
+ sub_id Value allocated within the sub_id when the event group was
+ read (sub_id must be consistent with commit order in master binlog).
+
+ Note that caller must later ensure that the new gtid and sub_id is inserted
+ into the appropriate HASH element with rpl_slave_state.add(), so that it can
+ be deleted later. But this must only be done after COMMIT if in transaction.
+*/
+int
+rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
+ bool in_transaction)
+{
+ TABLE_LIST tlist;
+ int err= 0;
+ bool table_opened= false;
+ TABLE *table;
+ list_element *elist= 0, *next;
+ element *elem;
+ ulonglong thd_saved_option= thd->variables.option_bits;
+ Query_tables_list lex_backup;
+
+ mysql_reset_thd_for_next_command(thd, 0);
+
+ DBUG_EXECUTE_IF("gtid_inject_record_gtid",
+ {
+ my_error(ER_CANNOT_UPDATE_GTID_STATE, MYF(0));
+ return 1;
+ } );
+
+ thd->lex->reset_n_backup_query_tables_list(&lex_backup);
+ tlist.init_one_table(STRING_WITH_LEN("mysql"),
+ rpl_gtid_slave_state_table_name.str,
+ rpl_gtid_slave_state_table_name.length,
+ NULL, TL_WRITE);
+ if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
+ goto end;
+ table_opened= true;
+ table= tlist.table;
+
+ if ((err= gtid_check_rpl_slave_state_table(table)))
+ goto end;
+
+ table->no_replicate= 1;
+ if (!in_transaction)
+ thd->variables.option_bits&=
+ ~(ulonglong)(OPTION_NOT_AUTOCOMMIT|OPTION_BEGIN);
+
+ bitmap_set_all(table->write_set);
+
+ table->field[0]->store((ulonglong)gtid->domain_id, true);
+ table->field[1]->store(sub_id, true);
+ table->field[2]->store((ulonglong)gtid->server_id, true);
+ table->field[3]->store(gtid->seq_no, true);
+ DBUG_EXECUTE_IF("inject_crash_before_write_rpl_slave_state", DBUG_SUICIDE(););
+ if ((err= table->file->ha_write_row(table->record[0])))
+ goto end;
+
+ lock();
+ if ((elem= get_element(gtid->domain_id)) == NULL)
+ {
+ unlock();
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ err= 1;
+ goto end;
+ }
+ elist= elem->grab_list();
+ unlock();
+
+ if (!elist)
+ goto end;
+
+ /* Now delete any already committed rows. */
+ bitmap_set_bit(table->read_set, table->field[0]->field_index);
+ bitmap_set_bit(table->read_set, table->field[1]->field_index);
+
+ if ((err= table->file->ha_index_init(0, 0)))
+ goto end;
+ while (elist)
+ {
+ uchar key_buffer[4+8];
+
+ next= elist->next;
+
+ table->field[1]->store(elist->sub_id, true);
+ /* domain_id is already set in table->record[0] from write_row() above. */
+ key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
+ if ((err= table->file->ha_index_read_map(table->record[1], key_buffer,
+ HA_WHOLE_KEY, HA_READ_KEY_EXACT)) ||
+ (err= table->file->ha_delete_row(table->record[1])))
+ break;
+ my_free(elist);
+ elist= next;
+ }
+ table->file->ha_index_end();
+
+ mysql_bin_log.bump_seq_no_counter_if_needed(gtid->seq_no);
+
+end:
+
+ if (table_opened)
+ {
+ if (err)
+ {
+ /*
+ ToDo: If error, we need to put any remaining elist back into the HASH so
+ we can do another delete attempt later.
+ */
+ ha_rollback_trans(thd, FALSE);
+ close_thread_tables(thd);
+ }
+ else
+ {
+ ha_commit_trans(thd, FALSE);
+ close_thread_tables(thd);
+ }
+ if (in_transaction)
+ thd->mdl_context.release_statement_locks();
+ else
+ thd->mdl_context.release_transactional_locks();
+ }
+ thd->lex->restore_backup_query_tables_list(&lex_backup);
+ thd->variables.option_bits= thd_saved_option;
+ return err;
+}
+
+
+uint64
+rpl_slave_state::next_subid(uint32 domain_id)
+{
+ uint32 sub_id= 0;
+ element *elem;
+
+ lock();
+ elem= get_element(domain_id);
+ if (elem)
+ sub_id= ++elem->last_sub_id;
+ unlock();
+
+ return sub_id;
+}
+
+
+bool
+rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first)
+{
+ if (*first)
+ *first= false;
+ else
+ if (dest->append(",",1))
+ return true;
+ return
+ dest->append_ulonglong(gtid->domain_id) ||
+ dest->append("-",1) ||
+ dest->append_ulonglong(gtid->server_id) ||
+ dest->append("-",1) ||
+ dest->append_ulonglong(gtid->seq_no);
+}
+
+
+/*
+ Prepare the current slave state as a string, suitable for sending to the
+ master to request to receive binlog events starting from that GTID state.
+
+ The state consists of the most recently applied GTID for each domain_id,
+ ie. the one with the highest sub_id within each domain_id.
+
+ Optinally, extra_gtids is a list of GTIDs from the binlog. This is used when
+ a server was previously a master and now needs to connect to a new master as
+ a slave. For each domain_id, if the GTID in the binlog was logged with our
+ own server_id _and_ has a higher seq_no than what is in the slave state,
+ then this should be used as the position to start replicating at. This
+ allows to promote a slave as new master, and connect the old master as a
+ slave with MASTER_GTID_POS=AUTO.
+*/
+
+int
+rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
+{
+ bool first= true;
+ uint32 i;
+ HASH gtid_hash;
+ uchar *rec;
+ rpl_gtid *gtid;
+ int res= 1;
+
+ my_hash_init(&gtid_hash, &my_charset_bin, 32, offsetof(rpl_gtid, domain_id),
+ sizeof(uint32), NULL, NULL, HASH_UNIQUE);
+ for (i= 0; i < num_extra; ++i)
+ if (extra_gtids[i].server_id == global_system_variables.server_id &&
+ my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
+ goto err;
+
+ lock();
+
+ for (i= 0; i < hash.records; ++i)
+ {
+ uint64 best_sub_id;
+ rpl_gtid best_gtid;
+ element *e= (element *)my_hash_element(&hash, i);
+ list_element *l= e->list;
+
+ if (!l)
+ continue; /* Nothing here */
+
+ best_gtid.domain_id= e->domain_id;
+ best_gtid.server_id= l->server_id;
+ best_gtid.seq_no= l->seq_no;
+ best_sub_id= l->sub_id;
+ while ((l= l->next))
+ {
+ if (l->sub_id > best_sub_id)
+ {
+ best_sub_id= l->sub_id;
+ best_gtid.server_id= l->server_id;
+ best_gtid.seq_no= l->seq_no;
+ }
+ }
+
+ /* Check if we have something newer in the extra list. */
+ rec= my_hash_search(&gtid_hash, (const uchar *)&best_gtid.domain_id, 0);
+ if (rec)
+ {
+ gtid= (rpl_gtid *)rec;
+ if (gtid->seq_no > best_gtid.seq_no)
+ memcpy(&best_gtid, gtid, sizeof(best_gtid));
+ if (my_hash_delete(&gtid_hash, rec))
+ {
+ unlock();
+ goto err;
+ }
+ }
+
+ if (rpl_slave_state_tostring_helper(dest, &best_gtid, &first))
+ {
+ unlock();
+ goto err;
+ }
+ }
+
+ unlock();
+
+ /* Also add any remaining extra domain_ids. */
+ for (i= 0; i < gtid_hash.records; ++i)
+ {
+ gtid= (rpl_gtid *)my_hash_element(&gtid_hash, i);
+ if (rpl_slave_state_tostring_helper(dest, gtid, &first))
+ goto err;
+ }
+
+ res= 0;
+
+err:
+ my_hash_free(&gtid_hash);
+
+ return res;
+}
+
+
+/*
+ Lookup a domain_id in the current replication slave state.
+
+ Returns false if the domain_id has no entries in the slave state.
+ Otherwise returns true, and fills in out_gtid with the corresponding
+ GTID.
+*/
+bool
+rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
+{
+ element *elem;
+ list_element *list;
+ uint64 best_sub_id;
+
+ lock();
+ elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
+ if (!elem || !(list= elem->list))
+ {
+ unlock();
+ return false;
+ }
+
+ out_gtid->domain_id= domain_id;
+ out_gtid->server_id= list->server_id;
+ out_gtid->seq_no= list->seq_no;
+ best_sub_id= list->sub_id;
+
+ while ((list= list->next))
+ {
+ if (best_sub_id > list->sub_id)
+ continue;
+ best_sub_id= list->sub_id;
+ out_gtid->server_id= list->server_id;
+ out_gtid->seq_no= list->seq_no;
+ }
+
+ unlock();
+ return true;
+}
+
+
+/*
+ Parse a GTID at the start of a string, and update the pointer to point
+ at the first character after the parsed GTID.
+
+ GTID can be in short form with domain_id=0 implied, SERVERID-SEQNO.
+ Or long form, DOMAINID-SERVERID-SEQNO.
+
+ Returns 0 on ok, non-zero on parse error.
+*/
+static int
+gtid_parser_helper(char **ptr, char *end, rpl_gtid *out_gtid)
+{
+ char *q;
+ char *p= *ptr;
+ uint64 v1, v2, v3;
+ int err= 0;
+
+ q= end;
+ v1= (uint64)my_strtoll10(p, &q, &err);
+ if (err != 0 || v1 > (uint32)0xffffffff || q == end || *q != '-')
+ return 1;
+ p= q+1;
+ q= end;
+ v2= (uint64)my_strtoll10(p, &q, &err);
+ if (err != 0 || v2 > (uint32)0xffffffff || q == end || *q != '-')
+ return 1;
+ p= q+1;
+ q= end;
+ v3= (uint64)my_strtoll10(p, &q, &err);
+ if (err != 0)
+ return 1;
+
+ out_gtid->domain_id= v1;
+ out_gtid->server_id= v2;
+ out_gtid->seq_no= v3;
+ *ptr= q;
+ return 0;
+}
+
+
+/*
+ Update the slave replication state with the GTID position obtained from
+ master when connecting with old-style (filename,offset) position.
+
+ If RESET is true then all existing entries are removed. Otherwise only
+ domain_ids mentioned in the STATE_FROM_MASTER are changed.
+
+ Returns 0 if ok, non-zero if error.
+*/
+int
+rpl_slave_state::load(THD *thd, char *state_from_master, size_t len,
+ bool reset)
+{
+ char *end= state_from_master + len;
+
+ if (reset)
+ {
+ if (truncate_state_table(thd))
+ return 1;
+ truncate_hash();
+ }
+ if (state_from_master == end)
+ return 0;
+ for (;;)
+ {
+ rpl_gtid gtid;
+ uint64 sub_id;
+
+ if (gtid_parser_helper(&state_from_master, end, &gtid) ||
+ !(sub_id= next_subid(gtid.domain_id)) ||
+ record_gtid(thd, &gtid, sub_id, false) ||
+ update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no))
+ return 1;
+ if (state_from_master == end)
+ break;
+ if (*state_from_master != ',')
+ return 1;
+ ++state_from_master;
+ }
+ return 0;
+}
+
+
+bool
+rpl_slave_state::is_empty()
+{
+ uint32 i;
+ bool result= true;
+
+ lock();
+ for (i= 0; i < hash.records; ++i)
+ {
+ element *e= (element *)my_hash_element(&hash, i);
+ if (e->list)
+ {
+ result= false;
+ break;
+ }
+ }
+ unlock();
+
+ return result;
+}
+
+
+rpl_binlog_state::rpl_binlog_state()
+{
+ my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
+ sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+ mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
+ MY_MUTEX_INIT_SLOW);
+}
+
+
+void
+rpl_binlog_state::reset()
+{
+ uint32 i;
+
+ for (i= 0; i < hash.records; ++i)
+ my_hash_free(&((element *)my_hash_element(&hash, i))->hash);
+ my_hash_reset(&hash);
+}
+
+rpl_binlog_state::~rpl_binlog_state()
+{
+ reset();
+ my_hash_free(&hash);
+ mysql_mutex_destroy(&LOCK_binlog_state);
+}
+
+
+/*
+ Update replication state with a new GTID.
+
+ If the (domain_id, server_id) pair already exists, then the new GTID replaces
+ the old one for that domain id. Else a new entry is inserted.
+
+ Returns 0 for ok, 1 for error.
+*/
+int
+rpl_binlog_state::update(const struct rpl_gtid *gtid)
+{
+ rpl_gtid *lookup_gtid;
+ element *elem;
+
+ elem= (element *)my_hash_search(&hash, (const uchar *)(&gtid->domain_id), 0);
+ if (elem)
+ {
+ /*
+ By far the most common case is that successive events within same
+ replication domain have the same server id (it changes only when
+ switching to a new master). So save a hash lookup in this case.
+ */
+ if (likely(elem->last_gtid->server_id == gtid->server_id))
+ {
+ elem->last_gtid->seq_no= gtid->seq_no;
+ return 0;
+ }
+
+ lookup_gtid= (rpl_gtid *)
+ my_hash_search(&elem->hash, (const uchar *)&gtid->server_id, 0);
+ if (lookup_gtid)
+ {
+ lookup_gtid->seq_no= gtid->seq_no;
+ elem->last_gtid= lookup_gtid;
+ return 0;
+ }
+
+ /* Allocate a new GTID and insert it. */
+ lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
+ if (!lookup_gtid)
+ return 1;
+ memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
+ if (my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
+ {
+ my_free(lookup_gtid);
+ return 1;
+ }
+ elem->last_gtid= lookup_gtid;
+ return 0;
+ }
+
+ /* First time we see this domain_id; allocate a new element. */
+ elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME));
+ lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
+ if (elem && lookup_gtid)
+ {
+ elem->domain_id= gtid->domain_id;
+ my_hash_init(&elem->hash, &my_charset_bin, 32,
+ offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
+ HASH_UNIQUE);
+ elem->last_gtid= lookup_gtid;
+ memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
+ if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
+ {
+ lookup_gtid= NULL; /* Do not free. */
+ if (0 == my_hash_insert(&hash, (const uchar *)elem))
+ return 0;
+ }
+ my_hash_free(&elem->hash);
+ }
+
+ /* An error. */
+ if (elem)
+ my_free(elem);
+ if (lookup_gtid)
+ my_free(lookup_gtid);
+ return 1;
+}
+
+
+uint64
+rpl_binlog_state::seq_no_from_state()
+{
+ ulong i, j;
+ uint64 seq_no= 0;
+
+ for (i= 0; i < hash.records; ++i)
+ {
+ element *e= (element *)my_hash_element(&hash, i);
+ for (j= 0; j < e->hash.records; ++j)
+ {
+ const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
+ if (gtid->seq_no > seq_no)
+ seq_no= gtid->seq_no;
+ }
+ }
+ return seq_no;
+}
+
+
+/*
+ Write binlog state to text file, so we can read it in again without having
+ to scan last binlog file (normal shutdown/startup, not crash recovery).
+
+ The most recent GTID within each domain_id is written after any other GTID
+ within this domain.
+*/
+int
+rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
+{
+ ulong i, j;
+ char buf[21];
+
+ for (i= 0; i < hash.records; ++i)
+ {
+ size_t res;
+ element *e= (element *)my_hash_element(&hash, i);
+ for (j= 0; j <= e->hash.records; ++j)
+ {
+ const rpl_gtid *gtid;
+ if (j < e->hash.records)
+ {
+ gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
+ if (gtid == e->last_gtid)
+ continue;
+ }
+ else
+ gtid= e->last_gtid;
+
+ longlong10_to_str(gtid->seq_no, buf, 10);
+ res= my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, buf);
+ if (res == (size_t) -1)
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+
+int
+rpl_binlog_state::read_from_iocache(IO_CACHE *src)
+{
+ /* 10-digit - 10-digit - 20-digit \n \0 */
+ char buf[10+1+10+1+20+1+1];
+ char *p, *end;
+ rpl_gtid gtid;
+
+ reset();
+ for (;;)
+ {
+ size_t res= my_b_gets(src, buf, sizeof(buf));
+ if (!res)
+ break;
+ p= buf;
+ end= buf + res;
+ if (gtid_parser_helper(&p, end, &gtid))
+ return 1;
+ if (update(&gtid))
+ return 1;
+ }
+ return 0;
+}
+
+
+rpl_gtid *
+rpl_binlog_state::find(uint32 domain_id, uint32 server_id)
+{
+ element *elem;
+ if (!(elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
+ return NULL;
+ return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, 0);
+}
+
+
+uint32
+rpl_binlog_state::count()
+{
+ uint32 c= 0;
+ uint32 i;
+
+ for (i= 0; i < hash.records; ++i)
+ c+= ((element *)my_hash_element(&hash, i))->hash.records;
+
+ return c;
+}
+
+
+int
+rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
+{
+ uint32 i, j, pos;
+
+ pos= 0;
+ for (i= 0; i < hash.records; ++i)
+ {
+ element *e= (element *)my_hash_element(&hash, i);
+ for (j= 0; j <= e->hash.records; ++j)
+ {
+ const rpl_gtid *gtid;
+ if (j < e->hash.records)
+ {
+ gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
+ if (gtid == e->last_gtid)
+ continue;
+ }
+ else
+ gtid= e->last_gtid;
+
+ if (pos >= list_size)
+ return 1;
+ memcpy(&gtid_list[pos++], gtid, sizeof(*gtid));
+ }
+ }
+
+ return 0;
+}
+
+
+/*
+ Get a list of the most recently binlogged GTID, for each domain_id.
+
+ This can be used when switching from being a master to being a slave,
+ to know where to start replicating from the new master.
+
+ The returned list must be de-allocated with my_free().
+
+ Returns 0 for ok, non-zero for out-of-memory.
+*/
+int
+rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
+{
+ uint32 i;
+
+ *size= hash.records;
+ if (!(*list= (rpl_gtid *)my_malloc(*size * sizeof(rpl_gtid), MYF(MY_WME))))
+ return 1;
+ for (i= 0; i < *size; ++i)
+ {
+ element *e= (element *)my_hash_element(&hash, i);
+ memcpy(&((*list)[i]), e->last_gtid, sizeof(rpl_gtid));
+ }
+
+ return 0;
+}
+
+
+slave_connection_state::slave_connection_state()
+{
+ my_hash_init(&hash, &my_charset_bin, 32,
+ offsetof(rpl_gtid, domain_id), sizeof(uint32), NULL, my_free,
+ HASH_UNIQUE);
+}
+
+
+slave_connection_state::~slave_connection_state()
+{
+ my_hash_free(&hash);
+}
+
+
+/*
+ Create a hash from the slave GTID state that is sent to master when slave
+ connects to start replication.
+
+ The state is sent as <GTID>,<GTID>,...,<GTID>, for example:
+
+ 0-2-112,1-4-1022
+
+ The state gives for each domain_id the GTID to start replication from for
+ the corresponding replication stream. So domain_id must be unique.
+
+ Returns 0 if ok, non-zero if error due to malformed input.
+
+ Note that input string is built by slave server, so it will not be incorrect
+ unless bug/corruption/malicious server. So we just need basic sanity check,
+ not fancy user-friendly error message.
+*/
+
+int
+slave_connection_state::load(char *slave_request, size_t len)
+{
+ char *p, *end;
+ uchar *rec;
+ rpl_gtid *gtid;
+ const rpl_gtid *gtid2;
+
+ my_hash_reset(&hash);
+ p= slave_request;
+ end= slave_request + len;
+ if (p == end)
+ return 0;
+ for (;;)
+ {
+ if (!(rec= (uchar *)my_malloc(sizeof(*gtid), MYF(MY_WME))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*gtid));
+ return 1;
+ }
+ gtid= (rpl_gtid *)rec;
+ if (gtid_parser_helper(&p, end, gtid))
+ {
+ my_free(rec);
+ my_error(ER_INCORRECT_GTID_STATE, MYF(0));
+ return 1;
+ }
+ if ((gtid2= (const rpl_gtid *)
+ my_hash_search(&hash, (const uchar *)(&gtid->domain_id), 0)))
+ {
+ my_error(ER_DUPLICATE_GTID_DOMAIN, MYF(0), gtid->domain_id,
+ gtid->server_id, (ulonglong)gtid->seq_no, gtid2->domain_id,
+ gtid2->server_id, (ulonglong)gtid2->seq_no, gtid->domain_id);
+ my_free(rec);
+ return 1;
+ }
+ if (my_hash_insert(&hash, rec))
+ {
+ my_free(rec);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+ }
+ if (p == end)
+ break; /* Finished. */
+ if (*p != ',')
+ {
+ my_error(ER_INCORRECT_GTID_STATE, MYF(0));
+ return 1;
+ }
+ ++p;
+ }
+
+ return 0;
+}
+
+
+int
+slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count)
+{
+ uint32 i;
+
+ my_hash_reset(&hash);
+ for (i= 0; i < count; ++i)
+ if (update(&gtid_list[i]))
+ return 1;
+ return 0;
+}
+
+
+rpl_gtid *
+slave_connection_state::find(uint32 domain_id)
+{
+ return (rpl_gtid *) my_hash_search(&hash, (const uchar *)(&domain_id), 0);
+}
+
+
+int
+slave_connection_state::update(const rpl_gtid *in_gtid)
+{
+ rpl_gtid *new_gtid;
+ uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
+ if (rec)
+ {
+ memcpy(rec, in_gtid, sizeof(*in_gtid));
+ return 0;
+ }
+
+ if (!(new_gtid= (rpl_gtid *)my_malloc(sizeof(*new_gtid), MYF(MY_WME))))
+ return 1;
+ memcpy(new_gtid, in_gtid, sizeof(*new_gtid));
+ if (my_hash_insert(&hash, (uchar *)new_gtid))
+ {
+ my_free(new_gtid);
+ return 1;
+ }
+
+ return 0;
+}
+
+
+void
+slave_connection_state::remove(const rpl_gtid *in_gtid)
+{
+ uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
+#ifndef DBUG_OFF
+ bool err;
+ rpl_gtid *slave_gtid= (rpl_gtid *)rec;
+ DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */);
+ DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id);
+ DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no);
+#endif
+
+ IF_DBUG(err=, )
+ my_hash_delete(&hash, rec);
+ DBUG_ASSERT(!err);
+}
+
+
+int
+slave_connection_state::to_string(String *out_str)
+{
+ uint32 i;
+ bool first;
+
+ out_str->length(0);
+ first= true;
+ for (i= 0; i < hash.records; ++i)
+ {
+ const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i);
+ if (rpl_slave_state_tostring_helper(out_str, gtid, &first))
+ return 1;
+ }
+ return 0;
+}
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
new file mode 100644
index 00000000000..e63d8439803
--- /dev/null
+++ b/sql/rpl_gtid.h
@@ -0,0 +1,179 @@
+/* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+#ifndef RPL_GTID_H
+#define RPL_GTID_H
+
+/* Definitions for MariaDB global transaction ID (GTID). */
+
+
+extern const LEX_STRING rpl_gtid_slave_state_table_name;
+
+class String;
+
+struct rpl_gtid
+{
+ uint32 domain_id;
+ uint32 server_id;
+ uint64 seq_no;
+};
+
+
+enum enum_gtid_skip_type {
+ GTID_SKIP_NOT, GTID_SKIP_STANDALONE, GTID_SKIP_TRANSACTION
+};
+
+
+/*
+ Replication slave state.
+
+ For every independent replication stream (identified by domain_id), this
+ remembers the last gtid applied on the slave within this domain.
+
+ Since events are always committed in-order within a single domain, this is
+ sufficient to maintain the state of the replication slave.
+*/
+struct rpl_slave_state
+{
+ /* Elements in the list of GTIDs kept for each domain_id. */
+ struct list_element
+ {
+ struct list_element *next;
+ uint64 sub_id;
+ uint64 seq_no;
+ uint32 server_id;
+ };
+
+ /* Elements in the HASH that hold the state for one domain_id. */
+ struct element
+ {
+ struct list_element *list;
+ uint64 last_sub_id;
+ uint32 domain_id;
+
+ list_element *grab_list() { list_element *l= list; list= NULL; return l; }
+ void add(list_element *l)
+ {
+ l->next= list;
+ list= l;
+ if (last_sub_id < l->sub_id)
+ last_sub_id= l->sub_id;
+ }
+ };
+
+ /* Mapping from domain_id to its element. */
+ HASH hash;
+ /* Mutex protecting access to the state. */
+ mysql_mutex_t LOCK_slave_state;
+
+ bool inited;
+ bool loaded;
+
+ rpl_slave_state();
+ ~rpl_slave_state();
+
+ void init();
+ void deinit();
+ void truncate_hash();
+ ulong count() const { return hash.records; }
+ int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no);
+ int truncate_state_table(THD *thd);
+ int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
+ bool in_transaction);
+ uint64 next_subid(uint32 domain_id);
+ int tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra);
+ bool domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid);
+ int load(THD *thd, char *state_from_master, size_t len, bool reset);
+ bool is_empty();
+
+ void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
+ void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); }
+
+ element *get_element(uint32 domain_id);
+
+ void update_state_hash(uint64 sub_id, rpl_gtid *gtid);
+ int record_and_update_gtid(THD *thd, Relay_log_info *rli);
+};
+
+
+/*
+ Binlog state.
+ This keeps the last GTID written to the binlog for every distinct
+ (domain_id, server_id) pair.
+ This will be logged at the start of the next binlog file as a
+ Gtid_list_log_event; this way, it is easy to find the binlog file
+ containing a gigen GTID, by simply scanning backwards from the newest
+ one until a lower seq_no is found in the Gtid_list_log_event at the
+ start of a binlog for the given domain_id and server_id.
+
+ We also remember the last logged GTID for every domain_id. This is used
+ to know where to start when a master is changed to a slave. As a side
+ effect, it also allows to skip a hash lookup in the very common case of
+ logging a new GTID with same server id as last GTID.
+*/
+struct rpl_binlog_state
+{
+ struct element {
+ uint32 domain_id;
+ HASH hash; /* Containing all server_id for one domain_id */
+ /* The most recent entry in the hash. */
+ rpl_gtid *last_gtid;
+ };
+ /* Mapping from domain_id to collection of elements. */
+ HASH hash;
+ /* Mutex protecting access to the state. */
+ mysql_mutex_t LOCK_binlog_state;
+
+ rpl_binlog_state();
+ ~rpl_binlog_state();
+
+ void reset();
+ int update(const struct rpl_gtid *gtid);
+ uint64 seq_no_from_state();
+ int write_to_iocache(IO_CACHE *dest);
+ int read_from_iocache(IO_CACHE *src);
+ uint32 count();
+ int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
+ int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
+ rpl_gtid *find(uint32 domain_id, uint32 server_id);
+};
+
+
+/*
+ Represent the GTID state that a slave connection to a master requests
+ the master to start sending binlog events from.
+*/
+struct slave_connection_state
+{
+ /* Mapping from domain_id to the GTID requested for that domain. */
+ HASH hash;
+
+ slave_connection_state();
+ ~slave_connection_state();
+
+ int load(char *slave_request, size_t len);
+ int load(const rpl_gtid *gtid_list, uint32 count);
+ rpl_gtid *find(uint32 domain_id);
+ int update(const rpl_gtid *in_gtid);
+ void remove(const rpl_gtid *gtid);
+ ulong count() const { return hash.records; }
+ int to_string(String *out_str);
+};
+
+extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
+ bool *first);
+extern int gtid_check_rpl_slave_state_table(TABLE *table);
+
+#endif /* RPL_GTID_H */
diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc
index 1d21b3f9445..2777dabf451 100644
--- a/sql/rpl_handler.cc
+++ b/sql/rpl_handler.cc
@@ -176,7 +176,7 @@ void delegates_destroy()
plugins add to thd->lex will be automatically unlocked.
*/
#define FOREACH_OBSERVER(r, f, thd, args) \
- param.server_id= thd->server_id; \
+ param.server_id= thd->variables.server_id; \
/*
Use a struct to make sure that they are allocated adjacent, check
delete_dynamic().
@@ -348,7 +348,7 @@ int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
ulong hlen;
Binlog_transmit_param param;
param.flags= flags;
- param.server_id= thd->server_id;
+ param.server_id= thd->variables.server_id;
int ret= 0;
read_lock();
diff --git a/sql/rpl_injector.cc b/sql/rpl_injector.cc
index ec1a96e8a2b..a4b04d2e047 100644
--- a/sql/rpl_injector.cc
+++ b/sql/rpl_injector.cc
@@ -108,7 +108,7 @@ int injector::transaction::use_table(server_id_type sid, table tbl)
if ((error= check_state(TABLE_STATE)))
DBUG_RETURN(error);
- server_id_type save_id= m_thd->server_id;
+ server_id_type save_id= m_thd->variables.server_id;
m_thd->set_server_id(sid);
error= m_thd->binlog_write_table_map(tbl.get_table(),
tbl.is_transactional());
@@ -127,7 +127,7 @@ int injector::transaction::write_row (server_id_type sid, table tbl,
if (error)
DBUG_RETURN(error);
- server_id_type save_id= m_thd->server_id;
+ server_id_type save_id= m_thd->variables.server_id;
m_thd->set_server_id(sid);
error= m_thd->binlog_write_row(tbl.get_table(), tbl.is_transactional(),
cols, colcnt, record);
@@ -146,7 +146,7 @@ int injector::transaction::delete_row(server_id_type sid, table tbl,
if (error)
DBUG_RETURN(error);
- server_id_type save_id= m_thd->server_id;
+ server_id_type save_id= m_thd->variables.server_id;
m_thd->set_server_id(sid);
error= m_thd->binlog_delete_row(tbl.get_table(), tbl.is_transactional(),
cols, colcnt, record);
@@ -165,7 +165,7 @@ int injector::transaction::update_row(server_id_type sid, table tbl,
if (error)
DBUG_RETURN(error);
- server_id_type save_id= m_thd->server_id;
+ server_id_type save_id= m_thd->variables.server_id;
m_thd->set_server_id(sid);
error= m_thd->binlog_update_row(tbl.get_table(), tbl.is_transactional(),
cols, colcnt, before, after);
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 3e02b555dc0..25fadf20cce 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -37,7 +37,7 @@ Master_info::Master_info(LEX_STRING *connection_name_arg,
checksum_alg_before_fd(BINLOG_CHECKSUM_ALG_UNDEF),
connect_retry(DEFAULT_CONNECT_RETRY), inited(0), abort_slave(0),
slave_running(0), slave_run_id(0), sync_counter(0),
- heartbeat_period(0), received_heartbeats(0), master_id(0)
+ heartbeat_period(0), received_heartbeats(0), master_id(0), using_gtid(0)
{
host[0] = 0; user[0] = 0; password[0] = 0;
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
@@ -62,7 +62,9 @@ Master_info::Master_info(LEX_STRING *connection_name_arg,
my_casedn_str(system_charset_info, cmp_connection_name.str);
}
- my_init_dynamic_array(&ignore_server_ids, sizeof(::server_id), 16, 16, MYF(0));
+ my_init_dynamic_array(&ignore_server_ids,
+ sizeof(global_system_variables.server_id), 16, 16,
+ MYF(0));
bzero((char*) &file, sizeof(file));
mysql_mutex_init(key_master_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_master_info_data_lock, &data_lock, MY_MUTEX_INIT_FAST);
@@ -142,6 +144,7 @@ void init_master_log_pos(Master_info* mi)
mi->master_log_name[0] = 0;
mi->master_log_pos = BIN_LOG_HEADER_SIZE; // skip magic number
+ mi->using_gtid= false;
/* Intentionally init ssl_verify_server_cert to 0, no option available */
mi->ssl_verify_server_cert= 0;
@@ -187,8 +190,13 @@ enum {
/* line for ssl_crl */
LINE_FOR_SSL_CRLPATH= 22,
- /* Number of lines currently used when saving master info file */
- LINES_IN_MASTER_INFO= LINE_FOR_SSL_CRLPATH
+ /* MySQL 5.6 fixed-position lines. */
+ LINE_FOR_FIRST_MYSQL_5_6=23,
+ LINE_FOR_LAST_MYSQL_5_6=23,
+ /* Reserved lines for MySQL future versions. */
+ LINE_FOR_LAST_MYSQL_FUTURE=33,
+ /* Number of (fixed-position) lines used when saving master info file */
+ LINES_IN_MASTER_INFO= LINE_FOR_LAST_MYSQL_FUTURE
};
int init_master_info(Master_info* mi, const char* master_info_fname,
@@ -316,7 +324,7 @@ file '%s')", fname);
int ssl= 0, ssl_verify_server_cert= 0;
float master_heartbeat_period= 0.0;
char *first_non_digit;
- char dummy_buf[HOSTNAME_LENGTH+1];
+ char buf[HOSTNAME_LENGTH+1];
/*
Starting from 4.1.x master.info has new format. Now its
@@ -410,7 +418,7 @@ file '%s')", fname);
(this is just a reservation to avoid future upgrade problems)
*/
if (lines >= LINE_FOR_MASTER_BIND &&
- init_strvar_from_file(dummy_buf, sizeof(dummy_buf), &mi->file, ""))
+ init_strvar_from_file(buf, sizeof(buf), &mi->file, ""))
goto errwithmsg;
/*
Starting from 6.0 list of server_id of ignorable servers might be
@@ -425,12 +433,12 @@ file '%s')", fname);
/* reserved */
if (lines >= LINE_FOR_MASTER_UUID &&
- init_strvar_from_file(dummy_buf, sizeof(dummy_buf), &mi->file, ""))
+ init_strvar_from_file(buf, sizeof(buf), &mi->file, ""))
goto errwithmsg;
/* Starting from 5.5 the master_retry_count may be in the repository. */
if (lines >= LINE_FOR_MASTER_RETRY_COUNT &&
- init_strvar_from_file(dummy_buf, sizeof(dummy_buf), &mi->file, ""))
+ init_strvar_from_file(buf, sizeof(buf), &mi->file, ""))
goto errwithmsg;
if (lines >= LINE_FOR_SSL_CRLPATH &&
@@ -439,6 +447,34 @@ file '%s')", fname);
init_strvar_from_file(mi->ssl_crlpath, sizeof(mi->ssl_crlpath),
&mi->file, "")))
goto errwithmsg;
+
+ /*
+ Starting with MariaDB 10.0, we use a key=value syntax, which is nicer
+ in several ways. But we leave a bunch of empty lines to accomodate
+ any future old-style additions in MySQL (this will make it easier for
+ users moving from MariaDB to MySQL, to not have MySQL try to
+ interpret a MariaDB key=value line.)
+ */
+ if (lines >= LINE_FOR_LAST_MYSQL_FUTURE)
+ {
+ uint i;
+ /* Skip lines used by / reserved for MySQL >= 5.6. */
+ for (i= LINE_FOR_FIRST_MYSQL_5_6; i <= LINE_FOR_LAST_MYSQL_FUTURE; ++i)
+ {
+ if (init_strvar_from_file(buf, sizeof(buf), &mi->file, ""))
+ goto errwithmsg;
+ }
+
+ /*
+ Parse any extra key=value lines.
+ Ignore unknown lines, to facilitate downgrades.
+ */
+ while (!init_strvar_from_file(buf, sizeof(buf), &mi->file, 0))
+ {
+ if (0 == strncmp(buf, STRING_WITH_LEN("using_gtid=")))
+ mi->using_gtid= (0 != atoi(buf + sizeof("using_gtid")));
+ }
+ }
}
#ifndef HAVE_OPENSSL
@@ -544,7 +580,7 @@ int flush_master_info(Master_info* mi,
char* ignore_server_ids_buf;
{
ignore_server_ids_buf=
- (char *) my_malloc((sizeof(::server_id) * 3 + 1) *
+ (char *) my_malloc((sizeof(global_system_variables.server_id) * 3 + 1) *
(1 + mi->ignore_server_ids.elements), MYF(MY_WME));
if (!ignore_server_ids_buf)
DBUG_RETURN(1);
@@ -578,7 +614,9 @@ int flush_master_info(Master_info* mi,
sprintf(heartbeat_buf, "%.3f", mi->heartbeat_period);
my_b_seek(file, 0L);
my_b_printf(file,
- "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n",
+ "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n"
+ "\n\n\n\n\n\n\n\n\n\n\n"
+ "using_gtid=%d\n",
LINES_IN_MASTER_INFO,
mi->master_log_name, llstr(mi->master_log_pos, lbuf),
mi->host, mi->user,
@@ -587,7 +625,7 @@ int flush_master_info(Master_info* mi,
mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert,
heartbeat_buf, "", ignore_server_ids_buf,
"", 0,
- mi->ssl_crl, mi->ssl_crlpath);
+ mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid);
my_free(ignore_server_ids_buf);
err= flush_io_cache(file);
if (sync_masterinfo_period && !err &&
@@ -675,7 +713,7 @@ bool check_master_connection_name(LEX_STRING *name)
file names without a prefix.
*/
-void create_logfile_name_with_suffix(char *res_file_name, uint length,
+void create_logfile_name_with_suffix(char *res_file_name, size_t length,
const char *info_file, bool append,
LEX_STRING *suffix)
{
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index 108bd51ff47..318306bbbb5 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -127,6 +127,11 @@ class Master_info : public Slave_reporting_capability
ulonglong received_heartbeats; // counter of received heartbeat events
DYNAMIC_ARRAY ignore_server_ids;
ulong master_id;
+ /*
+ True if slave position is set using GTID state rather than old-style
+ file/offset binlog position.
+ */
+ bool using_gtid;
};
int init_master_info(Master_info* mi, const char* master_info_fname,
const char* slave_info_fname,
@@ -171,7 +176,7 @@ public:
};
bool check_master_connection_name(LEX_STRING *name);
-void create_logfile_name_with_suffix(char *res_file_name, uint length,
+void create_logfile_name_with_suffix(char *res_file_name, size_t length,
const char *info_file,
bool append,
LEX_STRING *suffix);
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 2e74acc0345..cb1d7fb22de 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -32,6 +32,13 @@
static int count_relay_log_space(Relay_log_info* rli);
+/**
+ Current replication state (hash of last GTID executed, per replication
+ domain).
+*/
+rpl_slave_state rpl_global_gtid_slave_state;
+
+
// Defined in slave.cc
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
@@ -52,7 +59,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
abort_pos_wait(0), slave_run_id(0), sql_thd(0),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
- tables_to_lock(0), tables_to_lock_count(0),
+ gtid_sub_id(0), tables_to_lock(0), tables_to_lock_count(0),
last_event_start_time(0), deferred_events(NULL),m_flags(0),
row_stmt_start_timestamp(0), long_find_row_note_printed(false),
m_annotate_event(0)
@@ -1092,7 +1099,8 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
if (until_condition == UNTIL_MASTER_POS)
{
- if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id)
+ if (ev && ev->server_id == (uint32) global_system_variables.server_id &&
+ !replicate_same_server_id)
DBUG_RETURN(FALSE);
log_name= group_master_log_name;
log_pos= (!ev)? group_master_log_pos :
@@ -1190,7 +1198,7 @@ bool Relay_log_info::cached_charset_compare(char *charset) const
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
- time_t event_creation_time)
+ time_t event_creation_time, THD *thd)
{
#ifndef DBUG_OFF
extern uint debug_not_change_ts_if_art_event;
@@ -1225,7 +1233,23 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
else
{
inc_group_relay_log_pos(event_master_log_pos);
+ if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, this))
+ {
+ report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
+ "Failed to update GTID state in %s.%s, slave state may become "
+ "inconsistent: %d: %s",
+ "mysql", rpl_gtid_slave_state_table_name.str,
+ thd->stmt_da->sql_errno(), thd->stmt_da->message());
+ /*
+ At this point we are not in a transaction (for example after DDL),
+ so we can not roll back. Anyway, normally updates to the slave
+ state table should not fail, and if they do, at least we made the
+ DBA aware of the problem in the error log.
+ */
+ }
+ DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE(););
flush_relay_log_info(this);
+ DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE(););
/*
Note that Rotate_log_event::do_apply_event() does not call this
function, so there is no chance that a fake rotate event resets
@@ -1357,4 +1381,139 @@ void Relay_log_info::slave_close_thread_tables(THD *thd)
clear_tables_to_lock();
DBUG_VOID_RETURN;
}
+
+
+int
+rpl_load_gtid_slave_state(THD *thd)
+{
+ TABLE_LIST tlist;
+ TABLE *table;
+ bool table_opened= false;
+ bool table_scanned= false;
+ struct local_element { uint64 sub_id; rpl_gtid gtid; };
+ struct local_element *entry;
+ HASH hash;
+ int err= 0;
+ uint32 i;
+ uint64 highest_seq_no= 0;
+ DBUG_ENTER("rpl_load_gtid_slave_state");
+
+ rpl_global_gtid_slave_state.lock();
+ bool loaded= rpl_global_gtid_slave_state.loaded;
+ rpl_global_gtid_slave_state.unlock();
+ if (loaded)
+ DBUG_RETURN(0);
+
+ my_hash_init(&hash, &my_charset_bin, 32,
+ offsetof(local_element, gtid) + offsetof(rpl_gtid, domain_id),
+ sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+
+ mysql_reset_thd_for_next_command(thd, 0);
+
+ tlist.init_one_table(STRING_WITH_LEN("mysql"),
+ rpl_gtid_slave_state_table_name.str,
+ rpl_gtid_slave_state_table_name.length,
+ NULL, TL_READ);
+ if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
+ goto end;
+ table_opened= true;
+ table= tlist.table;
+
+ if ((err= gtid_check_rpl_slave_state_table(table)))
+ goto end;
+
+ bitmap_set_all(table->read_set);
+ if ((err= table->file->ha_rnd_init_with_error(1)))
+ goto end;
+ table_scanned= true;
+ for (;;)
+ {
+ uint32 domain_id, server_id;
+ uint64 sub_id, seq_no;
+ uchar *rec;
+
+ if ((err= table->file->ha_rnd_next(table->record[0])))
+ {
+ if (err == HA_ERR_RECORD_DELETED)
+ continue;
+ else if (err == HA_ERR_END_OF_FILE)
+ break;
+ else
+ goto end;
+ }
+ domain_id= (ulonglong)table->field[0]->val_int();
+ sub_id= (ulonglong)table->field[1]->val_int();
+ server_id= (ulonglong)table->field[2]->val_int();
+ seq_no= (ulonglong)table->field[3]->val_int();
+ DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu\n",
+ (unsigned)domain_id, (unsigned)server_id,
+ (ulong)seq_no, (ulong)sub_id));
+ if (seq_no > highest_seq_no)
+ highest_seq_no= seq_no;
+
+ if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0)))
+ {
+ entry= (struct local_element *)rec;
+ if (entry->sub_id >= sub_id)
+ continue;
+ entry->sub_id= sub_id;
+ DBUG_ASSERT(entry->gtid.domain_id == domain_id);
+ entry->gtid.server_id= server_id;
+ entry->gtid.seq_no= seq_no;
+ }
+ else
+ {
+ if (!(entry= (struct local_element *)my_malloc(sizeof(*entry),
+ MYF(MY_WME))))
+ {
+ err= 1;
+ goto end;
+ }
+ entry->sub_id= sub_id;
+ entry->gtid.domain_id= domain_id;
+ entry->gtid.server_id= server_id;
+ entry->gtid.seq_no= seq_no;
+ if ((err= my_hash_insert(&hash, (uchar *)entry)))
+ {
+ my_free(entry);
+ goto end;
+ }
+ }
+ }
+
+ rpl_global_gtid_slave_state.lock();
+ for (i= 0; i < hash.records; ++i)
+ {
+ entry= (struct local_element *)my_hash_element(&hash, i);
+ if ((err= rpl_global_gtid_slave_state.update(entry->gtid.domain_id,
+ entry->gtid.server_id,
+ entry->sub_id,
+ entry->gtid.seq_no)))
+ {
+ rpl_global_gtid_slave_state.unlock();
+ goto end;
+ }
+ }
+ rpl_global_gtid_slave_state.loaded= true;
+ rpl_global_gtid_slave_state.unlock();
+
+ err= 0; /* Clear HA_ERR_END_OF_FILE */
+
+end:
+ if (table_scanned)
+ {
+ table->file->ha_index_or_rnd_end();
+ ha_commit_trans(thd, FALSE);
+ ha_commit_trans(thd, TRUE);
+ }
+ if (table_opened)
+ {
+ close_thread_tables(thd);
+ thd->mdl_context.release_transactional_locks();
+ }
+ my_hash_free(&hash);
+ mysql_bin_log.bump_seq_no_counter_if_needed(highest_seq_no);
+ DBUG_RETURN(err);
+}
+
#endif
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 6144d37026b..7aff6720aac 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -307,6 +307,14 @@ public:
char slave_patternload_file[FN_REFLEN];
size_t slave_patternload_file_size;
+ /*
+ Current GTID being processed.
+ The sub_id gives the binlog order within one domain_id. A zero sub_id
+ means that there is no active GTID.
+ */
+ uint64 gtid_sub_id;
+ rpl_gtid current_gtid;
+
Relay_log_info(bool is_slave_recovery);
~Relay_log_info();
@@ -445,7 +453,7 @@ public:
the <code>Seconds_behind_master</code> field.
*/
void stmt_done(my_off_t event_log_pos,
- time_t event_creation_time);
+ time_t event_creation_time, THD *thd);
/**
@@ -584,4 +592,8 @@ private:
int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
+extern struct rpl_slave_state rpl_global_gtid_slave_state;
+
+int rpl_load_gtid_slave_state(THD *thd);
+
#endif /* RPL_RLI_H */
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index f1d5af44fd5..95caf1f43e5 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -6746,4 +6746,22 @@ ER_SLAVE_STOPPED
eng "SLAVE '%.*s' stopped"
ER_SQL_DISCOVER_ERROR
eng "Engine %s failed to discover table %`-.192s.%`-.192s with '%s'"
-
+ER_FAILED_GTID_STATE_INIT
+ eng "Failed initializing replication GTID state"
+ER_INCORRECT_GTID_STATE
+ eng "Could not parse GTID list for GTID_POS"
+ER_CANNOT_UPDATE_GTID_STATE
+ eng "Could not update replication slave gtid state"
+ER_DUPLICATE_GTID_DOMAIN
+ eng "GTID %u-%u-%llu and %u-%u-%llu conflict (duplicate domain id %u)"
+ER_GTID_OPEN_TABLE_FAILED
+ eng "Failed to open %s.%s"
+ ger "Öffnen von %s.%s fehlgeschlagen"
+ER_GTID_POSITION_NOT_FOUND_IN_BINLOG
+ eng "Connecting slave requested to start from GTID %u-%u-%llu, which is not in the master's binlog"
+ER_CANNOT_LOAD_SLAVE_GTID_STATE
+ eng "Failed to load replication slave GTID state from table %s.%s"
+ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG
+ eng "Requested GTID_POS %u-%u-%llu conflicts with the binary log which contains a more recent GTID %u-%u-%llu. To use the requested GTID_POS, the old binlog must be removed with RESET MASTER to avoid out-of-order binlog"
+ER_MASTER_GTID_POS_MISSING_DOMAIN
+ eng "Requested GTID_POS contains no value for replication domain %u. This conflicts with the binary log which contains GTID %u-%u-%llu. To use the requested GTID_POS, the old binlog must be removed with RESET MASTER to avoid out-of-order binlog"
diff --git a/sql/slave.cc b/sql/slave.cc
index 41eb7247b8c..d0723c331df 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -162,8 +162,10 @@ static int terminate_slave_thread(THD *thd,
volatile uint *slave_running,
bool skip_lock);
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
-static bool send_show_master_info_header(THD *thd, bool full);
-static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full);
+static bool send_show_master_info_header(THD *thd, bool full,
+ size_t gtid_pos_length);
+static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
+ String *gtid_pos);
/*
Find out which replications threads are running
@@ -395,6 +397,7 @@ int init_recovery(Master_info* mi, const char** errmsg)
DBUG_RETURN(0);
}
+
/**
Convert slave skip errors bitmap into a printable string.
@@ -718,7 +721,7 @@ int start_slave_thread(
if (start_lock)
mysql_mutex_lock(start_lock);
- if (!server_id)
+ if (!global_system_variables.server_id)
{
if (start_cond)
mysql_cond_broadcast(start_cond);
@@ -796,6 +799,7 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
mysql_mutex_t *lock_io=0, *lock_sql=0, *lock_cond_io=0, *lock_cond_sql=0;
mysql_cond_t* cond_io=0, *cond_sql=0;
int error=0;
+ const char *errmsg;
DBUG_ENTER("start_slave_threads");
if (need_slave_mutex)
@@ -811,6 +815,22 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
lock_cond_sql = &mi->rli.run_lock;
}
+ /*
+ If we are using GTID and both SQL and IO threads are stopped, then get
+ rid of all relay logs.
+
+ Relay logs are not very useful when using GTID, except as a buffer
+ between the fetch in the IO thread and the apply in SQL thread. However
+ while one of the threads is running, they are in use and cannot be
+ removed.
+ */
+ if (mi->using_gtid && !mi->slave_running && !mi->rli.slave_running)
+ {
+ purge_relay_logs(&mi->rli, NULL, 0, &errmsg);
+ mi->master_log_name[0]= 0;
+ mi->master_log_pos= 0;
+ }
+
if (thread_mask & SLAVE_IO)
error= start_slave_thread(
#ifdef HAVE_PSI_INTERFACE
@@ -1421,7 +1441,8 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
(master_res= mysql_store_result(mysql)) &&
(master_row= mysql_fetch_row(master_res)))
{
- if ((::server_id == (mi->master_id= strtoul(master_row[1], 0, 10))) &&
+ if ((global_system_variables.server_id ==
+ (mi->master_id= strtoul(master_row[1], 0, 10))) &&
!mi->rli.replicate_same_server_id)
{
errmsg= "The slave I/O thread stops because master and slave have equal \
@@ -1801,6 +1822,133 @@ past_checksum:
after_set_capability:
#endif
+ /*
+ Request dump start from slave replication GTID state.
+
+ Only request GTID position the first time we connect after CHANGE MASTER
+ or after starting both IO or SQL thread.
+
+ Otherwise, if the IO thread was ahead of the SQL thread before the
+ restart or reconnect, we might end up re-fetching and hence re-applying
+ the same event(s) again.
+ */
+ if (mi->using_gtid && !mi->master_log_name[0])
+ {
+ int rc;
+ char str_buf[256];
+ String connect_state(str_buf, sizeof(str_buf), system_charset_info);
+ connect_state.length(0);
+
+ /*
+ Read the master @@GLOBAL.gtid_domain_id variable.
+ This is mostly to check that master is GTID aware, but we could later
+ perhaps use it to check that different multi-source masters are correctly
+ configured with distinct domain_id.
+ */
+ if (mysql_real_query(mysql,
+ STRING_WITH_LEN("SELECT @@GLOBAL.gtid_domain_id")) ||
+ !(master_res= mysql_store_result(mysql)) ||
+ !(master_row= mysql_fetch_row(master_res)))
+ {
+ err_code= mysql_errno(mysql);
+ errmsg= "The slave I/O thread stops because master does not support "
+ "MariaDB global transaction id. A fatal error is encountered when "
+ "it tries to SELECT @@GLOBAL.gtid_domain_id.";
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ goto err;
+ }
+ mysql_free_result(master_res);
+ master_res= NULL;
+
+ connect_state.append(STRING_WITH_LEN("SET @slave_connect_state='"),
+ system_charset_info);
+ if (rpl_append_gtid_state(&connect_state, true))
+ {
+ err_code= ER_OUTOFMEMORY;
+ errmsg= "The slave I/O thread stops because a fatal out-of-memory "
+ "error is encountered when it tries to compute @slave_connect_state.";
+ sprintf(err_buff, "%s Error: Out of memory", errmsg);
+ goto err;
+ }
+ connect_state.append(STRING_WITH_LEN("'"), system_charset_info);
+
+ rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length());
+ if (rc)
+ {
+ err_code= mysql_errno(mysql);
+ if (is_network_error(err_code))
+ {
+ mi->report(ERROR_LEVEL, err_code,
+ "Setting @slave_connect_state failed with error: %s",
+ mysql_error(mysql));
+ goto network_err;
+ }
+ else
+ {
+ /* Fatal error */
+ errmsg= "The slave I/O thread stops because a fatal error is "
+ "encountered when it tries to set @slave_connect_state.";
+ sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
+ goto err;
+ }
+ }
+ }
+ if (!mi->using_gtid)
+ {
+ /*
+ If we are not using GTID to connect this time, then instead request
+ the corresponding GTID position from the master, so that the user
+ can reconnect the next time using MASTER_GTID_POS=AUTO.
+ */
+ char quote_buf[2*sizeof(mi->master_log_name)+1];
+ char str_buf[28+2*sizeof(mi->master_log_name)+10];
+ String query(str_buf, sizeof(str_buf), system_charset_info);
+ query.length(0);
+
+ query.append("SELECT binlog_gtid_pos('");
+ escape_quotes_for_mysql(&my_charset_bin, quote_buf, sizeof(quote_buf),
+ mi->master_log_name, strlen(mi->master_log_name));
+ query.append(quote_buf);
+ query.append("',");
+ query.append_ulonglong(mi->master_log_pos);
+ query.append(")");
+
+ if (!mysql_real_query(mysql, query.c_ptr_safe(), query.length()) &&
+ (master_res= mysql_store_result(mysql)) &&
+ (master_row= mysql_fetch_row(master_res)) &&
+ (master_row[0] != NULL))
+ {
+ rpl_global_gtid_slave_state.load(mi->io_thd, master_row[0],
+ strlen(master_row[0]), false);
+ }
+ else if (check_io_slave_killed(mi->io_thd, mi, NULL))
+ goto slave_killed_err;
+ else if (is_network_error(mysql_errno(mysql)))
+ {
+ mi->report(WARNING_LEVEL, mysql_errno(mysql),
+ "Get master GTID position failed with error: %s", mysql_error(mysql));
+ goto network_err;
+ }
+ else
+ {
+ /*
+ ToDo: If the master does not have the binlog_gtid_pos() function, it
+ just means that it is an old master with no GTID support, so we should
+ do nothing.
+
+ However, if binlog_gtid_pos() exists, but fails or returns NULL, then
+ it means that the requested position is not valid. We could use this
+ to catch attempts to replicate from within the middle of an event,
+ avoiding strange failures or possible corruption.
+ */
+ }
+ if (master_res)
+ {
+ mysql_free_result(master_res);
+ master_res= NULL;
+ }
+ }
+
err:
if (errmsg)
{
@@ -1994,7 +2142,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi,
DBUG_RETURN(0);
}
- int4store(pos, server_id); pos+= 4;
+ int4store(pos, global_system_variables.server_id); pos+= 4;
pos= net_store_data(pos, (uchar*) report_host, report_host_len);
pos= net_store_data(pos, (uchar*) report_user, report_user_len);
pos= net_store_data(pos, (uchar*) report_password, report_password_len);
@@ -2043,16 +2191,20 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi,
bool show_master_info(THD *thd, Master_info *mi, bool full)
{
DBUG_ENTER("show_master_info");
+ String gtid_pos;
- if (send_show_master_info_header(thd, full))
+ if (full && rpl_global_gtid_slave_state.tostring(&gtid_pos, NULL, 0))
+ DBUG_RETURN(TRUE);
+ if (send_show_master_info_header(thd, full, gtid_pos.length()))
DBUG_RETURN(TRUE);
- if (send_show_master_info_data(thd, mi, full))
+ if (send_show_master_info_data(thd, mi, full, &gtid_pos))
DBUG_RETURN(TRUE);
my_eof(thd);
DBUG_RETURN(FALSE);
}
-static bool send_show_master_info_header(THD *thd, bool full)
+static bool send_show_master_info_header(THD *thd, bool full,
+ size_t gtid_pos_length)
{
List<Item> field_list;
Protocol *protocol= thd->protocol;
@@ -2135,6 +2287,8 @@ static bool send_show_master_info_header(THD *thd, bool full)
sizeof(mi->ssl_crl)));
field_list.push_back(new Item_empty_string("Master_SSL_Crlpath",
sizeof(mi->ssl_crlpath)));
+ field_list.push_back(new Item_return_int("Using_Gtid", sizeof(ulong),
+ MYSQL_TYPE_LONG));
if (full)
{
field_list.push_back(new Item_return_int("Retried_transactions",
@@ -2147,6 +2301,7 @@ static bool send_show_master_info_header(THD *thd, bool full)
10, MYSQL_TYPE_LONG));
field_list.push_back(new Item_float("Slave_heartbeat_period",
0.0, 3, 10));
+ field_list.push_back(new Item_empty_string("Gtid_Pos", gtid_pos_length));
}
if (protocol->send_result_set_metadata(&field_list,
@@ -2156,7 +2311,8 @@ static bool send_show_master_info_header(THD *thd, bool full)
}
-static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full)
+static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
+ String *gtid_pos)
{
DBUG_ENTER("send_show_master_info_data");
@@ -2315,6 +2471,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full)
protocol->store(mi->ssl_ca, &my_charset_bin);
// Master_Ssl_Crlpath
protocol->store(mi->ssl_capath, &my_charset_bin);
+ protocol->store((uint32) (mi->using_gtid != 0));
if (full)
{
protocol->store((uint32) mi->rli.retried_trans);
@@ -2322,6 +2479,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full)
protocol->store((uint32) mi->rli.executed_entries);
protocol->store((uint32) mi->received_heartbeats);
protocol->store((double) mi->heartbeat_period, 3, &tmp);
+ protocol->store(gtid_pos->ptr(), gtid_pos->length(), &my_charset_bin);
}
mysql_mutex_unlock(&mi->rli.err_lock);
@@ -2364,11 +2522,19 @@ static int cmp_mi_by_name(const Master_info **arg1,
bool show_all_master_info(THD* thd)
{
uint i, elements;
+ String gtid_pos;
Master_info **tmp;
DBUG_ENTER("show_master_info");
mysql_mutex_assert_owner(&LOCK_active_mi);
- if (send_show_master_info_header(thd, 1))
+ gtid_pos.length(0);
+ if (rpl_append_gtid_state(&gtid_pos, true))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ DBUG_RETURN(TRUE);
+ }
+
+ if (send_show_master_info_header(thd, 1, gtid_pos.length()))
DBUG_RETURN(TRUE);
if (!(elements= master_info_index->master_info_hash.records))
@@ -2390,7 +2556,7 @@ bool show_all_master_info(THD* thd)
for (i= 0; i < elements; i++)
{
- if (send_show_master_info_data(thd, tmp[i], 1))
+ if (send_show_master_info_data(thd, tmp[i], 1, &gtid_pos))
DBUG_RETURN(TRUE);
}
@@ -2555,7 +2721,7 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
// TODO if big log files: Change next to int8store()
int4store(buf, (ulong) mi->master_log_pos);
int2store(buf + 4, binlog_flags);
- int4store(buf + 6, server_id);
+ int4store(buf + 6, global_system_variables.server_id);
len = (uint) strlen(logname);
memcpy(buf + 10, logname,len);
if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
@@ -2764,7 +2930,8 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
has a Rotate etc).
*/
- thd->server_id = ev->server_id; // use the original server id for logging
+ /* Use the original server id for logging. */
+ thd->variables.server_id = ev->server_id;
thd->set_time(); // time the query
thd->lex->current_select= 0;
if (!ev->when)
@@ -3548,17 +3715,34 @@ err_during_init:
/*
Check the temporary directory used by commands like
LOAD DATA INFILE.
+
+ As the directory never changes during a mysqld run, we only
+ test this once and cache the result. This also resolve a race condition
+ when this can be run by multiple threads at the same time.
*/
+
+static bool check_temp_dir_run= 0;
+static int check_temp_dir_result= 0;
+
static
int check_temp_dir(char* tmp_file)
{
- int fd;
+ File fd;
+ int result= 1; // Assume failure
MY_DIR *dirp;
char tmp_dir[FN_REFLEN];
size_t tmp_dir_size;
DBUG_ENTER("check_temp_dir");
+ mysql_mutex_lock(&LOCK_thread_count);
+ if (check_temp_dir_run)
+ {
+ result= check_temp_dir_result;
+ goto end;
+ }
+ check_temp_dir_run= 1;
+
/*
Get the directory from the temporary file.
*/
@@ -3568,27 +3752,33 @@ int check_temp_dir(char* tmp_file)
Check if the directory exists.
*/
if (!(dirp=my_dir(tmp_dir,MYF(MY_WME))))
- DBUG_RETURN(1);
+ goto end;
my_dirend(dirp);
/*
- Check permissions to create a file.
+ Check permissions to create a file. We use O_TRUNC to ensure that
+ things works even if we happen to have and old file laying around.
*/
if ((fd= mysql_file_create(key_file_misc,
tmp_file, CREATE_MODE,
- O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
+ O_WRONLY | O_BINARY | O_TRUNC | O_NOFOLLOW,
MYF(MY_WME))) < 0)
- DBUG_RETURN(1);
+ goto end;
+ result= 0; // Directory name ok
/*
Clean up.
*/
mysql_file_close(fd, MYF(0));
mysql_file_delete(key_file_misc, tmp_file, MYF(0));
- DBUG_RETURN(0);
+end:
+ check_temp_dir_result= result;
+ mysql_mutex_unlock(&LOCK_thread_count);
+ DBUG_RETURN(result);
}
+
/**
Slave SQL thread entry point.
@@ -3750,6 +3940,15 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
goto err;
}
+ /* Load the set of seen GTIDs, if we did not already. */
+ if (rpl_load_gtid_slave_state(thd))
+ {
+ rli->report(ERROR_LEVEL, thd->stmt_da->sql_errno(),
+ "Unable to load replication GTID slave state from mysql.%s: %s",
+ rpl_gtid_slave_state_table_name.str, thd->stmt_da->message());
+ goto err;
+ }
+
/* execute init_slave variable */
if (opt_init_slave.length)
{
@@ -3975,7 +4174,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
}
DBUG_ASSERT(cev->inited_from_old);
thd->file_id = cev->file_id = mi->file_id++;
- thd->server_id = cev->server_id;
+ thd->variables.server_id = cev->server_id;
cev_not_written = 1;
if (unlikely(net_request_file(net,cev->fname)))
@@ -4587,16 +4786,18 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
Heartbeat is sent only after an event corresponding to the corrdinates
the heartbeat carries.
- Slave can not have a difference in coordinates except in the only
+ Slave can not have a higher coordinate except in the only
special case when mi->master_log_name, master_log_pos have never
been updated by Rotate event i.e when slave does not have any history
with the master (and thereafter mi->master_log_pos is NULL).
+ Slave can have lower coordinates, if some event from master was omitted.
+
TODO: handling `when' for SHOW SLAVE STATUS' snds behind
*/
if ((memcmp(mi->master_log_name, hb.get_log_ident(), hb.get_ident_len())
&& mi->master_log_name != NULL)
- || mi->master_log_pos != hb.log_pos)
+ || mi->master_log_pos > hb.log_pos)
{
/* missed events of heartbeat from the past */
error= ER_SLAVE_HEARTBEAT_FAILURE;
@@ -4648,7 +4849,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mysql_mutex_lock(log_lock);
s_id= uint4korr(buf + SERVER_ID_OFFSET);
- if ((s_id == ::server_id && !mi->rli.replicate_same_server_id) ||
+ if ((s_id == global_system_variables.server_id &&
+ !mi->rli.replicate_same_server_id) ||
/*
the following conjunction deals with IGNORE_SERVER_IDS, if set
If the master is on the ignore list, execution of
@@ -4679,7 +4881,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
IGNORE_SERVER_IDS it increments mi->master_log_pos
as well as rli->group_relay_log_pos.
*/
- if (!(s_id == ::server_id && !mi->rli.replicate_same_server_id) ||
+ if (!(s_id == global_system_variables.server_id &&
+ !mi->rli.replicate_same_server_id) ||
(buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT &&
buf[EVENT_TYPE_OFFSET] != STOP_EVENT))
@@ -5217,6 +5420,27 @@ static Log_event* next_event(Relay_log_info* rli)
inc_event_relay_log_pos()
*/
rli->future_event_relay_log_pos= my_b_tell(cur_log);
+ /*
+ For GTID, allocate a new sub_id for the given domain_id.
+ The sub_id must be allocated in increasing order of binlog order.
+ */
+ if (ev->get_type_code() == GTID_EVENT)
+ {
+ Gtid_log_event *gev= static_cast<Gtid_log_event *>(ev);
+ uint64 sub_id= rpl_global_gtid_slave_state.next_subid(gev->domain_id);
+ if (!sub_id)
+ {
+ errmsg = "slave SQL thread aborted because of out-of-memory error";
+ if (hot_log)
+ mysql_mutex_unlock(log_lock);
+ goto err;
+ }
+ rli->gtid_sub_id= sub_id;
+ rli->current_gtid.server_id= gev->server_id;
+ rli->current_gtid.domain_id= gev->domain_id;
+ rli->current_gtid.seq_no= gev->seq_no;
+ }
+
if (hot_log)
mysql_mutex_unlock(log_lock);
DBUG_RETURN(ev);
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index 7fe9429d664..b4c163f61ee 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -345,7 +345,7 @@ uint create_tmp_table_def_key(THD *thd, char *key,
const char *db, const char *table_name)
{
uint key_length= create_table_def_key(key, db, table_name);
- int4store(key + key_length, thd->server_id);
+ int4store(key + key_length, thd->variables.server_id);
int4store(key + key_length + 4, thd->variables.pseudo_thread_id);
key_length+= TMP_TABLE_KEY_EXTRA;
return key_length;
@@ -388,6 +388,14 @@ bool table_def_init(void)
init_tdc_psi_keys();
#endif
mysql_mutex_init(key_LOCK_open, &LOCK_open, MY_MUTEX_INIT_FAST);
+ mysql_mutex_record_order(&LOCK_active_mi, &LOCK_open);
+ /*
+ When we delete from the table_def_cache(), the free function
+ table_def_free_entry() is invoked from my_hash_delete(), which calls
+ free_table_share(), which may unload plugins, which can remove status
+ variables and hence takes LOCK_status. Record this locking order here.
+ */
+ mysql_mutex_record_order(&LOCK_open, &LOCK_status);
oldest_unused_share= &end_of_unused_share;
end_of_unused_share.prev= &oldest_unused_share;
@@ -2670,7 +2678,7 @@ bool open_table(THD *thd, TABLE_LIST *table_list, MEM_ROOT *mem_root,
{
DBUG_PRINT("error",
("query_id: %lu server_id: %u pseudo_thread_id: %lu",
- (ulong) table->query_id, (uint) thd->server_id,
+ (ulong) table->query_id, (uint) thd->variables.server_id,
(ulong) thd->variables.pseudo_thread_id));
my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr());
DBUG_RETURN(TRUE);
@@ -5976,7 +5984,8 @@ TABLE *open_table_uncached(THD *thd, handlerton *hton,
("table: '%s'.'%s' path: '%s' server_id: %u "
"pseudo_thread_id: %lu",
db, table_name, path,
- (uint) thd->server_id, (ulong) thd->variables.pseudo_thread_id));
+ (uint) thd->variables.server_id,
+ (ulong) thd->variables.pseudo_thread_id));
/* Create the cache_key for temporary tables */
key_length= create_tmp_table_def_key(thd, cache_key, db, table_name);
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 240c4e4d627..238bf46e528 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -950,7 +950,7 @@ THD::THD()
/* Variables with default values */
proc_info="login";
where= THD::DEFAULT_WHERE;
- server_id = ::server_id;
+ variables.server_id = global_system_variables.server_id;
slave_net = 0;
m_command=COM_CONNECT;
*scramble= '\0';
@@ -5302,7 +5302,7 @@ int THD::binlog_write_row(TABLE* table, bool is_trans,
size_t const len= pack_row(table, cols, row_data, record);
Rows_log_event* const ev=
- binlog_prepare_pending_rows_event(table, server_id, cols, colcnt,
+ binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt,
len, is_trans,
static_cast<Write_rows_log_event*>(0));
@@ -5346,7 +5346,7 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
#endif
Rows_log_event* const ev=
- binlog_prepare_pending_rows_event(table, server_id, cols, colcnt,
+ binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt,
before_size + after_size, is_trans,
static_cast<Update_rows_log_event*>(0));
@@ -5377,7 +5377,7 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans,
size_t const len= pack_row(table, cols, row_data, record);
Rows_log_event* const ev=
- binlog_prepare_pending_rows_event(table, server_id, cols, colcnt,
+ binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt,
len, is_trans,
static_cast<Delete_rows_log_event*>(0));
diff --git a/sql/sql_class.h b/sql/sql_class.h
index afe6b831466..99c6f381692 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -546,12 +546,19 @@ typedef struct system_variables
ulong tx_isolation;
ulong updatable_views_with_limit;
int max_user_connections;
+ ulong server_id;
/**
In slave thread we need to know in behalf of which
thread the query is being run to replicate temp tables properly
*/
my_thread_id pseudo_thread_id;
/**
+ When replicating an event group with GTID, keep these values around so
+ slave binlog can receive the same GTID as the original.
+ */
+ uint32 gtid_domain_id;
+ uint64 gtid_seq_no;
+ /**
Place holders to store Multi-source variables in sys_var.cc during
update and show of variables.
*/
@@ -1736,7 +1743,6 @@ private:
enum enum_server_command m_command;
public:
- uint32 server_id;
uint32 file_id; // for LOAD DATA INFILE
/* remote (peer) port */
uint16 peer_port;
@@ -1813,7 +1819,7 @@ public:
MY_BITMAP const* cols, size_t colcnt,
const uchar *old_data, const uchar *new_data);
- void set_server_id(uint32 sid) { server_id = sid; }
+ void set_server_id(uint32 sid) { variables.server_id = sid; }
/*
Member functions to handle pending event for row-level logging.
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index 363b0d77e41..50afc657ee6 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -220,7 +220,8 @@ struct LEX_MASTER_INFO
changed variable or if it should be left at old value
*/
enum {LEX_MI_UNCHANGED, LEX_MI_DISABLE, LEX_MI_ENABLE}
- ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt;
+ ssl, ssl_verify_server_cert, heartbeat_opt, repl_ignore_server_ids_opt,
+ use_gtid_opt;
void init()
{
@@ -236,7 +237,7 @@ struct LEX_MASTER_INFO
pos= relay_log_pos= server_id= port= connect_retry= 0;
heartbeat_period= 0;
ssl= ssl_verify_server_cert= heartbeat_opt=
- repl_ignore_server_ids_opt= LEX_MI_UNCHANGED;
+ repl_ignore_server_ids_opt= use_gtid_opt= LEX_MI_UNCHANGED;
}
};
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index e8268f42cc7..589c1a3b7b9 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -1463,10 +1463,10 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
/* TODO: The following has to be changed to an 8 byte integer */
pos = uint4korr(packet);
flags = uint2korr(packet + 4);
- thd->server_id=0; /* avoid suicide */
+ thd->variables.server_id=0; /* avoid suicide */
if ((slave_server_id= uint4korr(packet+6))) // mysqlbinlog.server_id==0
kill_zombie_dump_threads(slave_server_id);
- thd->server_id = slave_server_id;
+ thd->variables.server_id = slave_server_id;
general_log_print(thd, command, "Log: '%s' Pos: %ld", packet+10,
(long) pos);
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 0bae99c2a27..42d54044b5a 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -16,10 +16,12 @@
#include "sql_priv.h"
#include "unireg.h"
+#include "sql_base.h"
#include "sql_parse.h" // check_access
#ifdef HAVE_REPLICATION
#include "rpl_mi.h"
+#include "rpl_rli.h"
#include "sql_repl.h"
#include "sql_acl.h" // SUPER_ACL
#include "log_event.h"
@@ -81,7 +83,7 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
uint ident_len = (uint) strlen(p);
ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN +
(do_checksum ? BINLOG_CHECKSUM_LEN : 0);
- int4store(header + SERVER_ID_OFFSET, server_id);
+ int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
int4store(header + EVENT_LEN_OFFSET, event_len);
int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
@@ -505,6 +507,26 @@ get_mariadb_slave_capability(THD *thd)
/*
+ Get the value of the @slave_connect_state user variable into the supplied
+ String (this is the GTID connect state requested by the connecting slave).
+
+ Returns false if error (ie. slave did not set the variable and does not
+ want to use GTID to set start position), true if success.
+*/
+static bool
+get_slave_connect_state(THD *thd, String *out_str)
+{
+ bool null_value;
+
+ const LEX_STRING name= { C_STRING_WITH_LEN("slave_connect_state") };
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry && entry->val_str(&null_value, out_str, 0) && !null_value;
+}
+
+
+/*
Function prepares and sends repliation heartbeat event.
@param net net object of THD
@@ -539,7 +561,7 @@ static int send_heartbeat_event(NET* net, String* packet,
uint ident_len = strlen(p);
ulong event_len = ident_len + LOG_EVENT_HEADER_LEN +
(do_checksum ? BINLOG_CHECKSUM_LEN : 0);
- int4store(header + SERVER_ID_OFFSET, server_id);
+ int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
int4store(header + EVENT_LEN_OFFSET, event_len);
int2store(header + FLAGS_OFFSET, 0);
@@ -567,6 +589,643 @@ static int send_heartbeat_event(NET* net, String* packet,
}
+struct binlog_file_entry
+{
+ binlog_file_entry *next;
+ char *name;
+};
+
+static binlog_file_entry *
+get_binlog_list(MEM_ROOT *memroot)
+{
+ IO_CACHE *index_file;
+ char fname[FN_REFLEN];
+ size_t length;
+ binlog_file_entry *current_list= NULL, *e;
+ DBUG_ENTER("get_binlog_list");
+
+ if (!mysql_bin_log.is_open())
+ {
+ my_error(ER_NO_BINARY_LOGGING, MYF(0));
+ DBUG_RETURN(NULL);
+ }
+
+ mysql_bin_log.lock_index();
+ index_file=mysql_bin_log.get_index_file();
+ reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
+
+ /* The file ends with EOF or empty line */
+ while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
+ {
+ --length; /* Remove the newline */
+ if (!(e= (binlog_file_entry *)alloc_root(memroot, sizeof(*e))) ||
+ !(e->name= strmake_root(memroot, fname, length)))
+ {
+ mysql_bin_log.unlock_index();
+ my_error(ER_OUTOFMEMORY, MYF(0), length + 1 + sizeof(*e));
+ DBUG_RETURN(NULL);
+ }
+ e->next= current_list;
+ current_list= e;
+ }
+ mysql_bin_log.unlock_index();
+
+ DBUG_RETURN(current_list);
+}
+
+/*
+ Find the Gtid_list_log_event at the start of a binlog.
+
+ NULL for ok, non-NULL error message for error.
+
+ If ok, then the event is returned in *out_gtid_list. This can be NULL if we
+ get back to binlogs written by old server version without GTID support. If
+ so, it means we have reached the point to start from, as no GTID events can
+ exist in earlier binlogs.
+*/
+static const char *
+get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
+{
+ Format_description_log_event init_fdle(BINLOG_VERSION);
+ Format_description_log_event *fdle;
+ Log_event *ev;
+ const char *errormsg = NULL;
+
+ *out_gtid_list= NULL;
+
+ if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle,
+ opt_master_verify_checksum)) ||
+ ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+ {
+ if (ev)
+ delete ev;
+ return "Could not read format description log event while looking for "
+ "GTID position in binlog";
+ }
+
+ fdle= static_cast<Format_description_log_event *>(ev);
+
+ for (;;)
+ {
+ Log_event_type typ;
+
+ ev= Log_event::read_log_event(cache, 0, fdle, opt_master_verify_checksum);
+ if (!ev)
+ {
+ errormsg= "Could not read GTID list event while looking for GTID "
+ "position in binlog";
+ break;
+ }
+ typ= ev->get_type_code();
+ if (typ == GTID_LIST_EVENT)
+ break; /* Done, found it */
+ delete ev;
+ if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
+ typ == FORMAT_DESCRIPTION_EVENT)
+ continue; /* Continue looking */
+
+ /* We did not find any Gtid_list_log_event, must be old binlog. */
+ ev= NULL;
+ break;
+ }
+
+ delete fdle;
+ *out_gtid_list= static_cast<Gtid_list_log_event *>(ev);
+ return errormsg;
+}
+
+
+/*
+ Check if every GTID requested by the slave is contained in this (or a later)
+ binlog file. Return true if so, false if not.
+
+ We do the check with a single scan of the list of GTIDs, avoiding the need
+ to build an in-memory hash or stuff like that.
+
+ We need to check that slave did not request GTID D-S-N1, when the
+ Gtid_list_log_event for this binlog file has D-S-N2 with N2 > N1.
+
+ In addition, we need to check that we do not have a GTID D-S-N3 in the
+ Gtid_list_log_event where D is not present in the requested slave state at
+ all. Since if D is not in requested slave state, it means that slave needs
+ to start at the very first GTID in domain D.
+*/
+static bool
+contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev)
+{
+ uint32 i;
+
+ for (i= 0; i < glev->count; ++i)
+ {
+ const rpl_gtid *gtid= st->find(glev->list[i].domain_id);
+ if (!gtid)
+ {
+ /*
+ The slave needs to start from the very beginning of this domain, which
+ is in an earlier binlog file. So we need to search back further.
+ */
+ return false;
+ }
+ if (gtid->server_id == glev->list[i].server_id &&
+ gtid->seq_no < glev->list[i].seq_no)
+ {
+ /*
+ The slave needs to receive gtid, but it is contained in an earlier
+ binlog file. So we need to search back further.
+ */
+ return false;
+ }
+ }
+
+ return true;
+}
+
+
+/*
+ Check the start GTID state requested by the slave against our binlog state.
+
+ Give an error if the slave requests something that we do not have in our
+ binlog.
+
+ T
+*/
+
+static int
+check_slave_start_position(THD *thd, slave_connection_state *st,
+ const char **errormsg, rpl_gtid *error_gtid)
+{
+ uint32 i;
+ bool found;
+ int err;
+ rpl_gtid **delete_list= NULL;
+ uint32 delete_idx= 0;
+ bool slave_state_loaded= false;
+ uint32 missing_domains= 0;
+ rpl_gtid missing_domain_gtid;
+
+ for (i= 0; i < st->hash.records; ++i)
+ {
+ rpl_gtid *slave_gtid= (rpl_gtid *)my_hash_element(&st->hash, i);
+ rpl_gtid master_gtid;
+ rpl_gtid master_replication_gtid;
+ rpl_gtid start_gtid;
+
+ if ((found= mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
+ slave_gtid->server_id,
+ &master_gtid)) &&
+ master_gtid.seq_no >= slave_gtid->seq_no)
+ continue;
+
+ if (!slave_state_loaded)
+ {
+ if (rpl_load_gtid_slave_state(thd))
+ {
+ *errormsg= "Failed to load replication slave GTID state";
+ err= ER_CANNOT_LOAD_SLAVE_GTID_STATE;
+ goto end;
+ }
+ slave_state_loaded= true;
+ }
+
+ if (!rpl_global_gtid_slave_state.domain_to_gtid(slave_gtid->domain_id,
+ &master_replication_gtid) ||
+ slave_gtid->server_id != master_replication_gtid.server_id ||
+ slave_gtid->seq_no != master_replication_gtid.seq_no)
+ {
+ rpl_gtid domain_gtid;
+
+ if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
+ &domain_gtid))
+ {
+ /*
+ We do not have anything in this domain, neither in the binlog nor
+ in the slave state. So we are probably one master in a multi-master
+ setup, and this domain is served by a different master.
+
+ This is not an error, however if we are missing _all_ domains
+ requested by the slave, then we still give error (below, after
+ the loop).
+ */
+ if (!missing_domains)
+ missing_domain_gtid= *slave_gtid;
+ ++missing_domains;
+ continue;
+ }
+ *errormsg= "Requested slave GTID state not found in binlog";
+ *error_gtid= *slave_gtid;
+ err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG;
+ goto end;
+ }
+
+ /*
+ Ok, so connecting slave asked to start at a GTID that we do not have in
+ our binlog, but it was in fact the last GTID we applied earlier, when we
+ were acting as a replication slave.
+
+ So this means that we were running as a replication slave without
+ --log-slave-updates, but now we switched to be a master. It is worth it
+ to handle this special case, as it allows users to run a simple
+ master -> slave without --log-slave-updates, and then exchange slave and
+ master, as long as they make sure the slave is caught up before switching.
+ */
+
+ /*
+ First check if we logged something ourselves as a master after being a
+ slave. This will be seen as a GTID with our own server_id and bigger
+ seq_no than what is in the slave state.
+
+ If we did not log anything ourselves, then start the connecting slave
+ replicating from the current binlog end position, which in this case
+ corresponds to our replication slave state and hence what the connecting
+ slave is requesting.
+ */
+ if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
+ global_system_variables.server_id,
+ &start_gtid) &&
+ start_gtid.seq_no > slave_gtid->seq_no)
+ {
+ /*
+ Start replication within this domain at the first GTID that we logged
+ ourselves after becoming a master.
+ */
+ slave_gtid->server_id= global_system_variables.server_id;
+ }
+ else if (mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
+ &start_gtid))
+ {
+ slave_gtid->server_id= start_gtid.server_id;
+ slave_gtid->seq_no= start_gtid.seq_no;
+ }
+ else
+ {
+ /*
+ We do not have _anything_ in our own binlog for this domain. Just
+ delete the entry in the slave connection state, then it will pick up
+ anything new that arrives.
+
+ We just queue up the deletion and do it later, after the loop, so that
+ we do not mess up the iteration over the hash.
+ */
+ if (!delete_list)
+ {
+ if ((delete_list= (rpl_gtid **)my_malloc(sizeof(*delete_list),
+ MYF(MY_WME))))
+ {
+ *errormsg= "Out of memory while checking slave start position";
+ err= ER_OUT_OF_RESOURCES;
+ goto end;
+ }
+ }
+ delete_list[delete_idx++]= slave_gtid;
+ }
+ }
+
+ if (missing_domains == st->hash.records && missing_domains > 0)
+ {
+ *errormsg= "Requested slave GTID state not found in binlog";
+ *error_gtid= missing_domain_gtid;
+ err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG;
+ goto end;
+ }
+
+ /* Do any delayed deletes from the hash. */
+ if (delete_list)
+ {
+ for (i= 0; i < delete_idx; ++i)
+ st->remove(delete_list[i]);
+ }
+ err= 0;
+
+end:
+ if (delete_list)
+ my_free(delete_list);
+ return err;
+}
+
+/*
+ Find the name of the binlog file to start reading for a slave that connects
+ using GTID state.
+
+ Returns the file name in out_name, which must be of size at least FN_REFLEN.
+
+ Returns NULL on ok, error message on error.
+
+ In case of non-error return, the returned binlog file is guaranteed to
+ contain the first event to be transmitted to the slave for every domain
+ present in our binlogs. It is still necessary to skip all GTIDs up to
+ and including the GTID requested by slave within each domain.
+
+ However, as a special case, if the event to be sent to the slave is the very
+ first event (within that domain) in the returned binlog, then nothing should
+ be skipped, so that domain is deleted from the passed in slave connection
+ state.
+
+ This is necessary in case the slave requests a GTID within a replication
+ domain that has long been inactive. The binlog file containing that GTID may
+ have been long since purged. However, as long as no GTIDs after that have
+ been purged, we have the GTID requested by slave in the Gtid_list_log_event
+ of the latest binlog. So we can start from there, as long as we delete the
+ corresponding entry in the slave state so we do not wrongly skip any events
+ that might turn up if that domain becomes active again, vainly looking for
+ the requested GTID that was already purged.
+*/
+static const char *
+gtid_find_binlog_file(slave_connection_state *state, char *out_name)
+{
+ MEM_ROOT memroot;
+ binlog_file_entry *list;
+ Gtid_list_log_event *glev= NULL;
+ const char *errormsg= NULL;
+ char buf[FN_REFLEN];
+
+ init_alloc_root(&memroot, 10*(FN_REFLEN+sizeof(binlog_file_entry)), 0,
+ MYF(MY_THREAD_SPECIFIC));
+ if (!(list= get_binlog_list(&memroot)))
+ {
+ errormsg= "Out of memory while looking for GTID position in binlog";
+ goto end;
+ }
+
+ while (list)
+ {
+ File file;
+ IO_CACHE cache;
+
+ if (!list->next)
+ {
+ /*
+ It should be safe to read the currently used binlog, as we will only
+ read the header part that is already written.
+
+ But if that does not work on windows, then we will need to cache the
+ event somewhere in memory I suppose - that could work too.
+ */
+ }
+ /*
+ Read the Gtid_list_log_event at the start of the binlog file to
+ get the binlog state.
+ */
+ if (normalize_binlog_name(buf, list->name, false))
+ {
+ errormsg= "Failed to determine binlog file name while looking for "
+ "GTID position in binlog";
+ goto end;
+ }
+ bzero((char*) &cache, sizeof(cache));
+ if ((file= open_binlog(&cache, buf, &errormsg)) == (File)-1)
+ goto end;
+ errormsg= get_gtid_list_event(&cache, &glev);
+ end_io_cache(&cache);
+ mysql_file_close(file, MYF(MY_WME));
+ if (errormsg)
+ goto end;
+
+ if (!glev || contains_all_slave_gtid(state, glev))
+ {
+ uint32 i;
+
+ strmake(out_name, buf, FN_REFLEN);
+
+ /*
+ As a special case, we allow to start from binlog file N if the
+ requested GTID is the last event (in the corresponding domain) in
+ binlog file (N-1), but then we need to remove that GTID from the slave
+ state, rather than skipping events waiting for it to turn up.
+ */
+ for (i= 0; i < glev->count; ++i)
+ {
+ const rpl_gtid *gtid= state->find(glev->list[i].domain_id);
+ if (!gtid)
+ {
+ /* contains_all_slave_gtid() would have returned false if so. */
+ DBUG_ASSERT(0);
+ continue;
+ }
+ if (gtid->server_id == glev->list[i].server_id &&
+ gtid->seq_no == glev->list[i].seq_no)
+ {
+ /*
+ The slave requested to start from the very beginning of this
+ domain in this binlog file. So delete the entry from the state,
+ we do not need to skip anything.
+ */
+ state->remove(gtid);
+ }
+ }
+
+ goto end;
+ }
+ delete glev;
+ glev= NULL;
+ list= list->next;
+ }
+
+ /* We reached the end without finding anything. */
+ errormsg= "Could not find GTID state requested by slave in any binlog "
+ "files. Probably the slave state is too old and required binlog files "
+ "have been purged.";
+
+end:
+ if (glev)
+ delete glev;
+
+ free_root(&memroot, MYF(0));
+ return errormsg;
+}
+
+
+/*
+ Given an old-style binlog position with file name and file offset, find the
+ corresponding gtid position. If the offset is not at an event boundary, give
+ an error.
+
+ Return NULL on ok, error message string on error.
+
+ ToDo: Improve the performance of this by using binlog index files.
+*/
+static const char *
+gtid_state_from_pos(const char *name, uint32 offset,
+ slave_connection_state *gtid_state)
+{
+ IO_CACHE cache;
+ File file;
+ const char *errormsg= NULL;
+ bool found_gtid_list_event= false;
+ bool found_format_description_event= false;
+ bool valid_pos= false;
+ uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
+ int err;
+ String packet;
+
+ if (gtid_state->load((const rpl_gtid *)NULL, 0))
+ {
+ errormsg= "Internal error (out of memory?) initializing slave state "
+ "while scanning binlog to find start position";
+ return errormsg;
+ }
+
+ if ((file= open_binlog(&cache, name, &errormsg)) == (File)-1)
+ return errormsg;
+
+ /*
+ First we need to find the initial GTID_LIST_EVENT. We need this even
+ if the offset is at the very start of the binlog file.
+
+ But if we do not find any GTID_LIST_EVENT, then this is an old binlog
+ with no GTID information, so we return empty GTID state.
+ */
+ for (;;)
+ {
+ Log_event_type typ;
+ uint32 cur_pos;
+
+ cur_pos= (uint32)my_b_tell(&cache);
+ if (cur_pos == offset)
+ valid_pos= true;
+ if (found_format_description_event && found_gtid_list_event &&
+ cur_pos >= offset)
+ break;
+
+ packet.length(0);
+ err= Log_event::read_log_event(&cache, &packet, NULL,
+ current_checksum_alg);
+ if (err)
+ {
+ errormsg= "Could not read binlog while searching for slave start "
+ "position on master";
+ goto end;
+ }
+ /*
+ The cast to uchar is needed to avoid a signed char being converted to a
+ negative number.
+ */
+ typ= (Log_event_type)(uchar)packet[EVENT_TYPE_OFFSET];
+ if (typ == FORMAT_DESCRIPTION_EVENT)
+ {
+ if (found_format_description_event)
+ {
+ errormsg= "Duplicate format description log event found while "
+ "searching for old-style position in binlog";
+ goto end;
+ }
+
+ current_checksum_alg= get_checksum_alg(packet.ptr(), packet.length());
+ found_format_description_event= true;
+ }
+ else if (typ != FORMAT_DESCRIPTION_EVENT && !found_format_description_event)
+ {
+ errormsg= "Did not find format description log event while searching "
+ "for old-style position in binlog";
+ goto end;
+ }
+ else if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
+ typ == BINLOG_CHECKPOINT_EVENT)
+ continue; /* Continue looking */
+ else if (typ == GTID_LIST_EVENT)
+ {
+ rpl_gtid *gtid_list;
+ bool status;
+ uint32 list_len;
+
+ if (found_gtid_list_event)
+ {
+ errormsg= "Found duplicate Gtid_list_log_event while scanning binlog "
+ "to find slave start position";
+ goto end;
+ }
+ status= Gtid_list_log_event::peek(packet.ptr(), packet.length(),
+ &gtid_list, &list_len);
+ if (status)
+ {
+ errormsg= "Error reading Gtid_list_log_event while searching "
+ "for old-style position in binlog";
+ goto end;
+ }
+ err= gtid_state->load(gtid_list, list_len);
+ my_free(gtid_list);
+ if (err)
+ {
+ errormsg= "Internal error (out of memory?) initialising slave state "
+ "while scanning binlog to find start position";
+ goto end;
+ }
+ found_gtid_list_event= true;
+ }
+ else if (!found_gtid_list_event)
+ {
+ /* We did not find any Gtid_list_log_event, must be old binlog. */
+ goto end;
+ }
+ else if (typ == GTID_EVENT)
+ {
+ rpl_gtid gtid;
+ uchar flags2;
+ if (Gtid_log_event::peek(packet.ptr(), packet.length(), &gtid.domain_id,
+ &gtid.server_id, &gtid.seq_no, &flags2))
+ {
+ errormsg= "Corrupt gtid_log_event found while scanning binlog to find "
+ "initial slave position";
+ goto end;
+ }
+ if (gtid_state->update(&gtid))
+ {
+ errormsg= "Internal error (out of memory?) updating slave state while "
+ "scanning binlog to find start position";
+ goto end;
+ }
+ }
+ }
+
+ if (!valid_pos)
+ {
+ errormsg= "Slave requested incorrect position in master binlog. "
+ "Requested position %u in file '%s', but this position does not "
+ "correspond to the location of any binlog event.";
+ }
+
+end:
+ end_io_cache(&cache);
+ mysql_file_close(file, MYF(MY_WME));
+
+ return errormsg;
+}
+
+
+int
+gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str)
+{
+ slave_connection_state gtid_state;
+ const char *lookup_name;
+ char name_buf[FN_REFLEN];
+ LOG_INFO linfo;
+
+ if (!mysql_bin_log.is_open())
+ {
+ my_error(ER_NO_BINARY_LOGGING, MYF(0));
+ return 1;
+ }
+
+ if (in_name && in_name[0])
+ {
+ mysql_bin_log.make_log_name(name_buf, in_name);
+ lookup_name= name_buf;
+ }
+ else
+ lookup_name= NULL;
+ linfo.index_file_offset= 0;
+ if (mysql_bin_log.find_log_pos(&linfo, lookup_name, 1))
+ return 1;
+
+ if (pos < 4)
+ pos= 4;
+
+ if (gtid_state_from_pos(linfo.log_file_name, pos, &gtid_state) ||
+ gtid_state.to_string(out_str))
+ return 1;
+ return 0;
+}
+
+
/*
Helper function for mysql_binlog_send() to write an event down the slave
connection.
@@ -577,9 +1236,63 @@ static const char *
send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Log_event_type event_type, char *log_file_name,
IO_CACHE *log, int mariadb_slave_capability,
- ulong ev_offset, uint8 current_checksum_alg)
+ ulong ev_offset, uint8 current_checksum_alg,
+ bool using_gtid_state, slave_connection_state *gtid_state,
+ enum_gtid_skip_type *gtid_skip_group)
{
my_off_t pos;
+ size_t len= packet->length();
+
+ /* Skip GTID event groups until we reach slave position within a domain_id. */
+ if (event_type == GTID_EVENT && using_gtid_state && gtid_state->count() > 0)
+ {
+ uint32 server_id, domain_id;
+ uint64 seq_no;
+ uchar flags2;
+ rpl_gtid *gtid;
+
+ if (ev_offset > len ||
+ Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
+ &domain_id, &server_id, &seq_no, &flags2))
+ return "Failed to read Gtid_log_event: corrupt binlog";
+ gtid= gtid_state->find(domain_id);
+ if (gtid != NULL)
+ {
+ /* Skip this event group if we have not yet reached slave start pos. */
+ if (server_id != gtid->server_id || seq_no <= gtid->seq_no)
+ *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+ /*
+ Delete this entry if we have reached slave start position (so we will
+ not skip subsequent events and won't have to look them up and check).
+ */
+ if (server_id == gtid->server_id && seq_no >= gtid->seq_no)
+ gtid_state->remove(gtid);
+ }
+ }
+
+ /*
+ Skip event group if we have not yet reached the correct slave GTID position.
+
+ Note that slave that understands GTID can also tolerate holes, so there is
+ no need to supply dummy event.
+ */
+ switch (*gtid_skip_group)
+ {
+ case GTID_SKIP_STANDALONE:
+ if (!Log_event::is_part_of_group(event_type))
+ *gtid_skip_group= GTID_SKIP_NOT;
+ return NULL;
+ case GTID_SKIP_TRANSACTION:
+ if (event_type == XID_EVENT ||
+ (event_type == QUERY_EVENT &&
+ Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset,
+ len - ev_offset)))
+ *gtid_skip_group= GTID_SKIP_NOT;
+ return NULL;
+ case GTID_SKIP_NOT:
+ break;
+ }
/* Do not send annotate_rows events unless slave requested it. */
if (event_type == ANNOTATE_ROWS_EVENT && !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
@@ -616,10 +1329,34 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
}
/*
- Do not send binlog checkpoint events to a slave that does not understand it.
+ Replace GTID events with old-style BEGIN events for slaves that do not
+ understand global transaction IDs. For stand-alone events, where there is
+ no terminating COMMIT query event, omit the GTID event or replace it with
+ a dummy event, as appropriate.
+ */
+ if (event_type == GTID_EVENT &&
+ mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID)
+ {
+ bool need_dummy=
+ mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES;
+ bool err= Gtid_log_event::make_compatible_event(packet, &need_dummy,
+ ev_offset,
+ current_checksum_alg);
+ if (err)
+ return "Failed to replace GTID event with backwards-compatible event: "
+ "currupt event.";
+ if (!need_dummy)
+ return NULL;
+ }
+
+ /*
+ Do not send binlog checkpoint or gtid list events to a slave that does not
+ understand it.
*/
- if (unlikely(event_type == BINLOG_CHECKPOINT_EVENT) &&
- mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT)
+ if ((unlikely(event_type == BINLOG_CHECKPOINT_EVENT) &&
+ mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT) ||
+ (unlikely(event_type == GTID_LIST_EVENT) &&
+ mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID))
{
if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
{
@@ -634,8 +1371,8 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
binlog positions.
*/
if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
- return "Failed to replace binlog checkpoint event with dummy: "
- "too small event.";
+ return "Failed to replace binlog checkpoint or gtid list event with "
+ "dummy: too small event.";
}
}
@@ -661,7 +1398,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
(thd, flags, packet, log_file_name, pos)))
return "run 'before_send_event' hook failed";
- if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ if (my_net_write(net, (uchar*) packet->ptr(), len))
return "Failed on my_net_write()";
DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] ));
@@ -696,6 +1433,12 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
mysql_mutex_t *log_lock;
mysql_cond_t *log_cond;
int mariadb_slave_capability;
+ char str_buf[256];
+ String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
+ bool using_gtid_state;
+ slave_connection_state gtid_state, return_gtid_state;
+ rpl_gtid error_gtid;
+ enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT;
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
@@ -706,6 +1449,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
bzero((char*) &log,sizeof(log));
+ bzero(&error_gtid, sizeof(error_gtid));
/*
heartbeat_period from @master_heartbeat_period user variable
*/
@@ -722,9 +1466,25 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
set_timespec_nsec(*heartbeat_ts, 0);
}
mariadb_slave_capability= get_mariadb_slave_capability(thd);
+
+ connect_gtid_state.length(0);
+ using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
+ DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", using_gtid_state= false;);
+ /*
+ We want to corrupt the first event, in Log_event::read_log_event().
+ But we do not want the corruption to happen early, eg. when client does
+ BINLOG_GTID_POS(). So test case sets a DBUG trigger which causes us to
+ set the real DBUG injection here.
+ */
+ DBUG_EXECUTE_IF("corrupt_read_log_event2_set",
+ {
+ DBUG_SET("-d,corrupt_read_log_event2_set");
+ DBUG_SET("+d,corrupt_read_log_event2");
+ });
+
if (global_system_variables.log_warnings > 1)
sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
- thd->server_id, log_ident, (ulong)pos);
+ (int)thd->variables.server_id, log_ident, (ulong)pos);
if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
{
errmsg= "Failed to run hook 'transmit_start'";
@@ -755,10 +1515,36 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
}
name=search_file_name;
- if (log_ident[0])
- mysql_bin_log.make_log_name(search_file_name, log_ident);
+ if (using_gtid_state)
+ {
+ if (gtid_state.load(connect_gtid_state.c_ptr_quick(),
+ connect_gtid_state.length()))
+ {
+ errmsg= "Out of memory or malformed slave request when obtaining start "
+ "position from GTID state";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ if ((error= check_slave_start_position(thd, &gtid_state, &errmsg,
+ &error_gtid)))
+ {
+ my_errno= error;
+ goto err;
+ }
+ if ((errmsg= gtid_find_binlog_file(&gtid_state, search_file_name)))
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ goto err;
+ }
+ pos= 4;
+ }
else
- name=0; // Find first log
+ {
+ if (log_ident[0])
+ mysql_bin_log.make_log_name(search_file_name, log_ident);
+ else
+ name=0; // Find first log
+ }
linfo.index_file_offset = 0;
@@ -1012,7 +1798,8 @@ impossible position";
if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
log_file_name, &log,
mariadb_slave_capability, ev_offset,
- current_checksum_alg)))
+ current_checksum_alg, using_gtid_state,
+ &gtid_state, &gtid_skip_group)))
{
errmsg= tmp_msg;
my_errno= ER_UNKNOWN_ERROR;
@@ -1105,7 +1892,8 @@ impossible position";
int ret;
ulong signal_cnt;
DBUG_PRINT("wait",("waiting for data in binary log"));
- if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
+ /* For mysqlbinlog (mysqlbinlog.server_id==0). */
+ if (thd->variables.server_id==0)
{
mysql_mutex_unlock(log_lock);
goto end;
@@ -1172,7 +1960,9 @@ impossible position";
(tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
log_file_name, &log,
mariadb_slave_capability, ev_offset,
- current_checksum_alg)))
+ current_checksum_alg,
+ using_gtid_state, &gtid_state,
+ &gtid_skip_group)))
{
errmsg= tmp_msg;
my_errno= ER_UNKNOWN_ERROR;
@@ -1264,6 +2054,22 @@ err:
my_basename(p_coord->file_name), p_coord->pos,
my_basename(log_file_name), my_b_tell(&log));
}
+ else if (my_errno == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG)
+ {
+ my_snprintf(error_text, sizeof(error_text),
+ "Error: connecting slave requested to start from GTID "
+ "%u-%u-%llu, which is not in the master's binlog",
+ error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no);
+ /* Use this error code so slave will know not to try reconnect. */
+ my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ }
+ else if (my_errno == ER_CANNOT_LOAD_SLAVE_GTID_STATE)
+ {
+ my_snprintf(error_text, sizeof(error_text),
+ "Failed to load replication slave GTID state from table %s.%s",
+ "mysql", rpl_gtid_slave_state_table_name.str);
+ my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ }
else
strcpy(error_text, errmsg);
end_io_cache(&log);
@@ -1628,7 +2434,7 @@ void kill_zombie_dump_threads(uint32 slave_server_id)
while ((tmp=it++))
{
if (tmp->get_command() == COM_BINLOG_DUMP &&
- tmp->server_id == slave_server_id)
+ tmp->variables.server_id == slave_server_id)
{
mysql_mutex_lock(&tmp->LOCK_thd_data); // Lock from delete
break;
@@ -1837,7 +2643,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
{
ulong s_id;
get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i);
- if (s_id == ::server_id && replicate_same_server_id)
+ if (s_id == global_system_variables.server_id && replicate_same_server_id)
{
my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), static_cast<int>(s_id));
ret= TRUE;
@@ -1902,6 +2708,13 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
}
+ if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_MI_ENABLE)
+ mi->using_gtid= true;
+ else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_MI_DISABLE ||
+ lex_mi->log_file_name || lex_mi->pos ||
+ lex_mi->relay_log_name || lex_mi->relay_log_pos)
+ mi->using_gtid= false;
+
/*
If user did specify neither host nor port nor any log name nor any log
pos, i.e. he specified only user/password/master_connect_retry, he probably
@@ -1932,6 +2745,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
strmake(mi->master_log_name, mi->rli.group_master_log_name,
sizeof(mi->master_log_name)-1);
}
+
/*
Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
a slave before).
@@ -1954,6 +2768,16 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
ret= TRUE;
goto err;
}
+
+ if (mi->using_gtid)
+ {
+ /*
+ Clear the position in the master binlogs, so that we request the
+ correct GTID position.
+ */
+ mi->master_log_name[0]= 0;
+ mi->master_log_pos= 0;
+ }
}
else
{
@@ -2426,4 +3250,139 @@ int log_loaded_block(IO_CACHE* file)
DBUG_RETURN(0);
}
+
+/**
+ Initialise the slave replication state from the mysql.rpl_slave_state table.
+
+ This is called each time an SQL thread starts, but the data is only actually
+ loaded on the first call.
+
+ The slave state is the last GTID applied on the slave within each
+ replication domain.
+
+ To avoid row lock contention, there are multiple rows for each domain_id.
+ The one containing the current slave state is the one with the maximal
+ sub_id value, within each domain_id.
+
+ CREATE TABLE mysql.rpl_slave_state (
+ domain_id INT UNSIGNED NOT NULL,
+ sub_id BIGINT UNSIGNED NOT NULL,
+ server_id INT UNSIGNED NOT NULL,
+ seq_no BIGINT UNSIGNED NOT NULL,
+ PRIMARY KEY (domain_id, sub_id))
+*/
+
+void
+rpl_init_gtid_slave_state()
+{
+ rpl_global_gtid_slave_state.init();
+}
+
+
+void
+rpl_deinit_gtid_slave_state()
+{
+ rpl_global_gtid_slave_state.deinit();
+}
+
+
+/*
+ Format the current GTID state as a string, for use when connecting to a
+ master server with GTID, or for returning the value of @@global.gtid_state.
+
+ If the flag use_binlog is true, then the contents of the binary log (if
+ enabled) is merged into the current GTID state.
+*/
+int
+rpl_append_gtid_state(String *dest, bool use_binlog)
+{
+ int err;
+ rpl_gtid *gtid_list= NULL;
+ uint32 num_gtids= 0;
+
+ if (opt_bin_log &&
+ (err= mysql_bin_log.get_most_recent_gtid_list(&gtid_list, &num_gtids)))
+ return err;
+
+ rpl_global_gtid_slave_state.tostring(dest, gtid_list, num_gtids);
+ my_free(gtid_list);
+
+ return 0;
+}
+
+
+bool
+rpl_gtid_pos_check(char *str, size_t len)
+{
+ slave_connection_state tmp_slave_state;
+
+ /* Check that we can parse the supplied string. */
+ if (tmp_slave_state.load(str, len))
+ return true;
+
+ /*
+ Check our own binlog for any of our own transactions that are newer
+ than the GTID state the user is requesting. Any such transactions would
+ result in an out-of-order binlog, which could break anyone replicating
+ with us as master.
+
+ So give an error if this is found, requesting the user to do a
+ RESET MASTER (to clean up the binlog) if they really want this.
+ */
+ if (mysql_bin_log.is_open())
+ {
+ rpl_gtid *binlog_gtid_list= NULL;
+ uint32 num_binlog_gtids= 0;
+ uint32 i;
+
+ if (mysql_bin_log.get_most_recent_gtid_list(&binlog_gtid_list,
+ &num_binlog_gtids))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
+ return true;
+ }
+ for (i= 0; i < num_binlog_gtids; ++i)
+ {
+ rpl_gtid *binlog_gtid= &binlog_gtid_list[i];
+ rpl_gtid *slave_gtid;
+ if (binlog_gtid->server_id != global_system_variables.server_id)
+ continue;
+ if (!(slave_gtid= tmp_slave_state.find(binlog_gtid->domain_id)))
+ {
+ my_error(ER_MASTER_GTID_POS_MISSING_DOMAIN, MYF(0),
+ binlog_gtid->domain_id, binlog_gtid->domain_id,
+ binlog_gtid->server_id, binlog_gtid->seq_no);
+ break;
+ }
+ if (slave_gtid->seq_no < binlog_gtid->seq_no)
+ {
+ my_error(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, MYF(0),
+ slave_gtid->domain_id, slave_gtid->server_id,
+ slave_gtid->seq_no, binlog_gtid->domain_id,
+ binlog_gtid->server_id, binlog_gtid->seq_no);
+ break;
+ }
+ }
+ my_free(binlog_gtid_list);
+ if (i != num_binlog_gtids)
+ return true;
+ }
+
+ return false;
+}
+
+
+bool
+rpl_gtid_pos_update(THD *thd, char *str, size_t len)
+{
+ if (rpl_global_gtid_slave_state.load(thd, str, len, true))
+ {
+ my_error(ER_FAILED_GTID_STATE_INIT, MYF(0));
+ return true;
+ }
+ else
+ return false;
+}
+
+
#endif /* HAVE_REPLICATION */
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index 9ca7e6b00b1..3af8f721bd7 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -65,6 +65,14 @@ int log_loaded_block(IO_CACHE* file);
int init_replication_sys_vars();
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
+extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
+void rpl_init_gtid_slave_state();
+void rpl_deinit_gtid_slave_state();
+int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str);
+int rpl_append_gtid_state(String *dest, bool use_binlog);
+bool rpl_gtid_pos_check(char *str, size_t len);
+bool rpl_gtid_pos_update(THD *thd, char *str, size_t len);
+
#endif /* HAVE_REPLICATION */
#endif /* SQL_REPL_INCLUDED */
diff --git a/sql/sql_show.cc b/sql/sql_show.cc
index 84da93a2131..48fb926bfcc 100644
--- a/sql/sql_show.cc
+++ b/sql/sql_show.cc
@@ -7837,16 +7837,22 @@ int make_schema_select(THD *thd, SELECT_LEX *sel,
We have to make non const db_name & table_name
because of lower_case_table_names
*/
- thd->make_lex_string(&db, INFORMATION_SCHEMA_NAME.str,
- INFORMATION_SCHEMA_NAME.length);
- thd->make_lex_string(&table, schema_table->table_name,
- strlen(schema_table->table_name));
- if (schema_table->old_format(thd, schema_table) || /* Handle old syntax */
- !sel->add_table_to_list(thd, new Table_ident(thd, db, table, 0),
+ if (!thd->make_lex_string(&db, INFORMATION_SCHEMA_NAME.str,
+ INFORMATION_SCHEMA_NAME.length))
+ DBUG_RETURN(1);
+
+ if (!thd->make_lex_string(&table, schema_table->table_name,
+ strlen(schema_table->table_name)))
+ DBUG_RETURN(1);
+
+ if (schema_table->old_format(thd, schema_table))
+
+ DBUG_RETURN(1);
+
+ if (!sel->add_table_to_list(thd, new Table_ident(thd, db, table, 0),
0, 0, TL_READ, MDL_SHARED_READ))
- {
DBUG_RETURN(1);
- }
+
DBUG_RETURN(0);
}
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index fdd90d95b2d..447ccf5f469 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -945,6 +945,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
%token AUTHORS_SYM
%token AUTOEXTEND_SIZE_SYM
%token AUTO_INC
+%token AUTO_SYM
%token AVG_ROW_LENGTH
%token AVG_SYM /* SQL-2003-N */
%token BACKUP_SYM
@@ -1204,6 +1205,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
%token LOW_PRIORITY
%token LT /* OPERATOR */
%token MASTER_CONNECT_RETRY_SYM
+%token MASTER_USE_GTID_SYM
%token MASTER_HOST_SYM
%token MASTER_LOG_FILE_SYM
%token MASTER_LOG_POS_SYM
@@ -2189,6 +2191,16 @@ master_file_def:
/* Adjust if < BIN_LOG_HEADER_SIZE (same comment as Lex->mi.pos) */
Lex->mi.relay_log_pos = max(BIN_LOG_HEADER_SIZE, Lex->mi.relay_log_pos);
}
+ | MASTER_USE_GTID_SYM EQ ulong_num
+ {
+ if (Lex->mi.use_gtid_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
+ {
+ my_error(ER_DUP_ARGUMENT, MYF(0), "MASTER_use_gtid");
+ MYSQL_YYABORT;
+ }
+ Lex->mi.use_gtid_opt= $3 ?
+ LEX_MASTER_INFO::LEX_MI_ENABLE : LEX_MASTER_INFO::LEX_MI_DISABLE;
+ }
;
optional_connection_name:
@@ -13366,6 +13378,7 @@ keyword_sp:
| AUTHORS_SYM {}
| AUTO_INC {}
| AUTOEXTEND_SIZE_SYM {}
+ | AUTO_SYM {}
| AVG_ROW_LENGTH {}
| AVG_SYM {}
| BINLOG_SYM {}
@@ -13476,6 +13489,7 @@ keyword_sp:
| MAX_ROWS {}
| MASTER_SYM {}
| MASTER_HEARTBEAT_PERIOD_SYM {}
+ | MASTER_USE_GTID_SYM {}
| MASTER_HOST_SYM {}
| MASTER_PORT_SYM {}
| MASTER_LOG_FILE_SYM {}
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 6d5d92081a9..314f6781635 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -58,6 +58,7 @@
#include "../storage/perfschema/pfs_server.h"
#endif /* WITH_PERFSCHEMA_STORAGE_ENGINE */
#include "threadpool.h"
+#include "sql_repl.h"
/*
The rule for this file: everything should be 'static'. When a sys_var
@@ -1331,6 +1332,114 @@ static Sys_var_ulong Sys_pseudo_thread_id(
BLOCK_SIZE(1), NO_MUTEX_GUARD, IN_BINLOG,
ON_CHECK(check_has_super));
+static Sys_var_uint Sys_gtid_domain_id(
+ "gtid_domain_id",
+ "Used with global transaction ID to identify logically independent "
+ "replication streams. When events can propagate through multiple "
+ "parallel paths (for example multiple masters), each independent "
+ "source server must use a distinct domain_id. For simple tree-shaped "
+ "replication topologies, it can be left at its default, 0.",
+ SESSION_VAR(gtid_domain_id),
+ CMD_LINE(REQUIRED_ARG), VALID_RANGE(0, UINT_MAX32), DEFAULT(0),
+ BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(check_has_super));
+
+static Sys_var_ulonglong Sys_gtid_seq_no(
+ "gtid_seq_no",
+ "Internal server usage, for replication with global transaction id. "
+ "When set, next event group logged to the binary log will use this "
+ "sequence number, not generate a new one, thus allowing to preserve "
+ "master's GTID in slave's binlog.",
+ SESSION_ONLY(gtid_seq_no),
+ NO_CMD_LINE, VALID_RANGE(0, ULONGLONG_MAX), DEFAULT(0),
+ BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(check_has_super));
+
+
+#ifdef HAVE_REPLICATION
+bool
+Sys_var_gtid_pos::do_check(THD *thd, set_var *var)
+{
+ String str, *res;
+ bool running;
+
+ DBUG_ASSERT(var->type == OPT_GLOBAL);
+ mysql_mutex_lock(&LOCK_active_mi);
+ running= master_info_index->give_error_if_slave_running();
+ mysql_mutex_unlock(&LOCK_active_mi);
+ if (running)
+ return true;
+ if (!(res= var->value->val_str(&str)))
+ return true;
+ if (rpl_gtid_pos_check(&((*res)[0]), res->length()))
+ return true;
+
+ if (!(var->save_result.string_value.str=
+ thd->strmake(res->ptr(), res->length())))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return true;
+ }
+ var->save_result.string_value.length= res->length();
+ return false;
+}
+
+
+bool
+Sys_var_gtid_pos::global_update(THD *thd, set_var *var)
+{
+ bool err;
+
+ DBUG_ASSERT(var->type == OPT_GLOBAL);
+
+ if (!var->value)
+ {
+ my_error(ER_NO_DEFAULT, MYF(0), var->var->name.str);
+ return true;
+ }
+
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ mysql_mutex_lock(&LOCK_active_mi);
+ if (master_info_index->give_error_if_slave_running())
+ err= true;
+ else
+ err= rpl_gtid_pos_update(thd, var->save_result.string_value.str,
+ var->save_result.string_value.length);
+ mysql_mutex_unlock(&LOCK_active_mi);
+ mysql_mutex_lock(&LOCK_global_system_variables);
+ return err;
+}
+
+
+uchar *
+Sys_var_gtid_pos::global_value_ptr(THD *thd, LEX_STRING *base)
+{
+ String str;
+ char *p;
+
+ str.length(0);
+ if (rpl_append_gtid_state(&str, true) ||
+ !(p= thd->strmake(str.ptr(), str.length())))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return NULL;
+ }
+
+ return (uchar *)p;
+}
+
+
+static unsigned char opt_gtid_pos_dummy;
+static Sys_var_gtid_pos Sys_gtid_pos(
+ "gtid_pos",
+ "The list of global transaction IDs that were last replicated on the "
+ "server, one for each replication domain. This defines where a slave "
+ "starts replicating from on a master when connecting with global "
+ "transaction ID.",
+ GLOBAL_VAR(opt_gtid_pos_dummy), NO_CMD_LINE);
+#endif
+
+
static bool fix_max_join_size(sys_var *self, THD *thd, enum_var_type type)
{
SV *sv= type == OPT_GLOBAL ? &global_system_variables : &thd->variables;
@@ -2096,17 +2205,27 @@ static Sys_var_charptr Sys_secure_file_priv(
static bool fix_server_id(sys_var *self, THD *thd, enum_var_type type)
{
- server_id_supplied = 1;
- thd->server_id= server_id;
+ if (type == OPT_GLOBAL)
+ {
+ server_id_supplied = 1;
+ thd->variables.server_id= global_system_variables.server_id;
+ /*
+ Historically, server_id was a global variable that is exported to
+ plugins. Now it is a session variable, and lives in the
+ global_system_variables struct, but we still need to export the
+ value for reading to plugins for backwards compatibility reasons.
+ */
+ ::server_id= global_system_variables.server_id;
+ }
return false;
}
static Sys_var_ulong Sys_server_id(
"server_id",
"Uniquely identifies the server instance in the community of "
"replication partners",
- GLOBAL_VAR(server_id), CMD_LINE(REQUIRED_ARG, OPT_SERVER_ID),
+ SESSION_VAR(server_id), CMD_LINE(REQUIRED_ARG, OPT_SERVER_ID),
VALID_RANGE(0, UINT_MAX32), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD,
- NOT_IN_BINLOG, ON_CHECK(0), ON_UPDATE(fix_server_id));
+ NOT_IN_BINLOG, ON_CHECK(check_has_super), ON_UPDATE(fix_server_id));
static Sys_var_mybool Sys_slave_compressed_protocol(
"slave_compressed_protocol",
@@ -3546,6 +3665,7 @@ get_master_info_uint_value(THD *thd, ptrdiff_t offset)
{
Master_info *mi;
uint res= 0; // Default value
+ mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi);
mi= master_info_index->
get_master_info(&thd->variables.default_master_connection,
@@ -3557,6 +3677,7 @@ get_master_info_uint_value(THD *thd, ptrdiff_t offset)
mysql_mutex_unlock(&mi->rli.data_lock);
}
mysql_mutex_unlock(&LOCK_active_mi);
+ mysql_mutex_lock(&LOCK_global_system_variables);
return res;
}
@@ -3568,6 +3689,8 @@ bool update_multi_source_variable(sys_var *self_var, THD *thd,
bool result= true;
Master_info *mi;
+ if (type == OPT_GLOBAL)
+ mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi);
mi= master_info_index->
get_master_info(&thd->variables.default_master_connection,
@@ -3581,6 +3704,8 @@ bool update_multi_source_variable(sys_var *self_var, THD *thd,
mysql_mutex_unlock(&mi->rli.run_lock);
}
mysql_mutex_unlock(&LOCK_active_mi);
+ if (type == OPT_GLOBAL)
+ mysql_mutex_lock(&LOCK_global_system_variables);
return result;
}
diff --git a/sql/sys_vars.h b/sql/sys_vars.h
index 9d91c4175f8..47cb5a327e1 100644
--- a/sql/sys_vars.h
+++ b/sql/sys_vars.h
@@ -2045,3 +2045,43 @@ public:
}
};
+
+/**
+ Class for @@global.gtid_pos.
+*/
+class Sys_var_gtid_pos: public sys_var
+{
+public:
+ Sys_var_gtid_pos(const char *name_arg,
+ const char *comment, int flag_args, ptrdiff_t off, size_t size,
+ CMD_LINE getopt)
+ : sys_var(&all_sys_vars, name_arg, comment, flag_args, off, getopt.id,
+ getopt.arg_type, SHOW_CHAR, 0, NULL, VARIABLE_NOT_IN_BINLOG,
+ NULL, NULL, NULL)
+ {
+ option.var_type= GET_STR;
+ }
+ bool do_check(THD *thd, set_var *var);
+ bool session_update(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool global_update(THD *thd, set_var *var);
+ bool check_update_type(Item_result type) { return type != STRING_RESULT; }
+ void session_save_default(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ }
+ void global_save_default(THD *thd, set_var *var)
+ {
+ /* Record the attempt to use default so we can error. */
+ var->value= 0;
+ }
+ uchar *session_value_ptr(THD *thd, LEX_STRING *base)
+ {
+ DBUG_ASSERT(false);
+ return NULL;
+ }
+ uchar *global_value_ptr(THD *thd, LEX_STRING *base);
+};
diff --git a/sql/table.cc b/sql/table.cc
index b8732a78afc..8de3aea3d51 100644
--- a/sql/table.cc
+++ b/sql/table.cc
@@ -3614,6 +3614,46 @@ Table_check_intact::check(TABLE *table, const TABLE_FIELD_DEF *table_def)
}
}
+ if (table_def->primary_key_parts)
+ {
+ if (table->s->primary_key == MAX_KEY)
+ {
+ report_error(0, "Incorrect definition of table %s.%s: "
+ "missing primary key.", table->s->db.str,
+ table->alias.c_ptr());
+ error= TRUE;
+ }
+ else
+ {
+ KEY *pk= &table->s->key_info[table->s->primary_key];
+ if (pk->key_parts != table_def->primary_key_parts)
+ {
+ report_error(0, "Incorrect definition of table %s.%s: "
+ "Expected primary key to have %u columns, but instead "
+ "found %u columns.", table->s->db.str,
+ table->alias.c_ptr(), table_def->primary_key_parts,
+ pk->key_parts);
+ error= TRUE;
+ }
+ else
+ {
+ for (i= 0; i < pk->key_parts; ++i)
+ {
+ if (table_def->primary_key_columns[i] + 1 != pk->key_part[i].fieldnr)
+ {
+ report_error(0, "Incorrect definition of table %s.%s: Expected "
+ "primary key part %u to refer to column %u, but "
+ "instead found column %u.", table->s->db.str,
+ table->alias.c_ptr(), i + 1,
+ table_def->primary_key_columns[i] + 1,
+ pk->key_part[i].fieldnr);
+ error= TRUE;
+ }
+ }
+ }
+ }
+ }
+
if (! error)
table->s->table_field_def_cache= table_def;
diff --git a/sql/table.h b/sql/table.h
index fa5bd83371a..2841b854da1 100644
--- a/sql/table.h
+++ b/sql/table.h
@@ -493,6 +493,8 @@ typedef struct st_table_field_def
{
uint count;
const TABLE_FIELD_TYPE *field;
+ uint primary_key_parts;
+ const uint *primary_key_columns;
} TABLE_FIELD_DEF;