summaryrefslogtreecommitdiff
path: root/sql/rpl_gtid.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r--sql/rpl_gtid.cc313
1 files changed, 249 insertions, 64 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index d6a6ed90bd3..b34b890060b 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -29,7 +29,7 @@
const LEX_STRING rpl_gtid_slave_state_table_name=
- { C_STRING_WITH_LEN("rpl_slave_state") };
+ { C_STRING_WITH_LEN("gtid_slave_pos") };
void
@@ -73,7 +73,7 @@ rpl_slave_state::record_and_update_gtid(THD *thd, Relay_log_info *rli)
if ((sub_id= rli->gtid_sub_id))
{
rli->gtid_sub_id= 0;
- if (record_gtid(thd, &rli->current_gtid, sub_id, false))
+ if (record_gtid(thd, &rli->current_gtid, sub_id, false, false))
return 1;
update_state_hash(sub_id, &rli->current_gtid);
}
@@ -186,8 +186,6 @@ rpl_slave_state::truncate_state_table(THD *thd)
int err= 0;
TABLE *table;
- mysql_reset_thd_for_next_command(thd, 0);
-
tlist.init_one_table(STRING_WITH_LEN("mysql"),
rpl_gtid_slave_state_table_name.str,
rpl_gtid_slave_state_table_name.length,
@@ -234,7 +232,7 @@ static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= {
static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1};
-static const TABLE_FIELD_DEF mysql_rpl_slave_state_tabledef= {
+static const TABLE_FIELD_DEF mysql_gtid_slave_pos_tabledef= {
array_elements(mysql_rpl_slave_state_coltypes),
mysql_rpl_slave_state_coltypes,
array_elements(mysql_rpl_slave_state_pk_parts),
@@ -256,14 +254,14 @@ protected:
static Gtid_db_intact gtid_table_intact;
/*
- Check that the mysql.rpl_slave_state table has the correct definition.
+ Check that the mysql.gtid_slave_pos table has the correct definition.
*/
int
gtid_check_rpl_slave_state_table(TABLE *table)
{
int err;
- if ((err= gtid_table_intact.check(table, &mysql_rpl_slave_state_tabledef)))
+ if ((err= gtid_table_intact.check(table, &mysql_gtid_slave_pos_tabledef)))
my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql",
rpl_gtid_slave_state_table_name.str);
return err;
@@ -286,7 +284,7 @@ gtid_check_rpl_slave_state_table(TABLE *table)
*/
int
rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
- bool in_transaction)
+ bool in_transaction, bool in_statement)
{
TABLE_LIST tlist;
int err= 0;
@@ -297,7 +295,8 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
ulonglong thd_saved_option= thd->variables.option_bits;
Query_tables_list lex_backup;
- mysql_reset_thd_for_next_command(thd, 0);
+ if (!in_statement)
+ mysql_reset_thd_for_next_command(thd, 0);
DBUG_EXECUTE_IF("gtid_inject_record_gtid",
{
@@ -371,7 +370,10 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
}
table->file->ha_index_end();
- mysql_bin_log.bump_seq_no_counter_if_needed(gtid->seq_no);
+ if(!err && opt_bin_log &&
+ (err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id,
+ gtid->seq_no)))
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
end:
@@ -626,7 +628,7 @@ gtid_parser_helper(char **ptr, char *end, rpl_gtid *out_gtid)
*/
int
rpl_slave_state::load(THD *thd, char *state_from_master, size_t len,
- bool reset)
+ bool reset, bool in_statement)
{
char *end= state_from_master + len;
@@ -645,7 +647,7 @@ rpl_slave_state::load(THD *thd, char *state_from_master, size_t len,
if (gtid_parser_helper(&state_from_master, end, &gtid) ||
!(sub_id= next_subid(gtid.domain_id)) ||
- record_gtid(thd, &gtid, sub_id, false) ||
+ record_gtid(thd, &gtid, sub_id, false, in_statement) ||
update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no))
return 1;
if (state_from_master == end)
@@ -686,6 +688,7 @@ rpl_binlog_state::rpl_binlog_state()
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
MY_MUTEX_INIT_SLOW);
+ initialized= 1;
}
@@ -699,11 +702,36 @@ rpl_binlog_state::reset()
my_hash_reset(&hash);
}
-rpl_binlog_state::~rpl_binlog_state()
+void rpl_binlog_state::free()
+{
+ if (initialized)
+ {
+ initialized= 0;
+ reset();
+ my_hash_free(&hash);
+ mysql_mutex_destroy(&LOCK_binlog_state);
+ }
+}
+
+
+bool
+rpl_binlog_state::load(struct rpl_gtid *list, uint32 count)
{
+ uint32 i;
+
reset();
- my_hash_free(&hash);
- mysql_mutex_destroy(&LOCK_binlog_state);
+ for (i= 0; i < count; ++i)
+ {
+ if (update(&(list[i]), false))
+ return true;
+ }
+ return false;
+}
+
+
+rpl_binlog_state::~rpl_binlog_state()
+{
+ free();
}
@@ -716,48 +744,111 @@ rpl_binlog_state::~rpl_binlog_state()
Returns 0 for ok, 1 for error.
*/
int
-rpl_binlog_state::update(const struct rpl_gtid *gtid)
+rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict)
{
- rpl_gtid *lookup_gtid;
element *elem;
- elem= (element *)my_hash_search(&hash, (const uchar *)(&gtid->domain_id), 0);
- if (elem)
+ if ((elem= (element *)my_hash_search(&hash,
+ (const uchar *)(&gtid->domain_id), 0)))
{
- /*
- By far the most common case is that successive events within same
- replication domain have the same server id (it changes only when
- switching to a new master). So save a hash lookup in this case.
- */
- if (likely(elem->last_gtid->server_id == gtid->server_id))
+ if (strict && elem->last_gtid && elem->last_gtid->seq_no >= gtid->seq_no)
{
- elem->last_gtid->seq_no= gtid->seq_no;
- return 0;
+ my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), gtid->domain_id,
+ gtid->server_id, gtid->seq_no, elem->last_gtid->domain_id,
+ elem->last_gtid->server_id, elem->last_gtid->seq_no);
+ return 1;
}
+ if (elem->seq_no_counter < gtid->seq_no)
+ elem->seq_no_counter= gtid->seq_no;
+ if (!elem->update_element(gtid))
+ return 0;
+ }
+ else if (!alloc_element(gtid))
+ return 0;
- lookup_gtid= (rpl_gtid *)
- my_hash_search(&elem->hash, (const uchar *)&gtid->server_id, 0);
- if (lookup_gtid)
- {
- lookup_gtid->seq_no= gtid->seq_no;
- elem->last_gtid= lookup_gtid;
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+}
+
+
+/*
+ Fill in a new GTID, allocating next sequence number, and update state
+ accordingly.
+*/
+int
+rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id,
+ rpl_gtid *gtid)
+{
+ element *elem;
+
+ gtid->domain_id= domain_id;
+ gtid->server_id= server_id;
+
+ if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
+ {
+ gtid->seq_no= ++elem->seq_no_counter;
+ if (!elem->update_element(gtid))
return 0;
- }
+ }
+ else
+ {
+ gtid->seq_no= 1;
+ if (!alloc_element(gtid))
+ return 0;
+ }
- /* Allocate a new GTID and insert it. */
- lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
- if (!lookup_gtid)
- return 1;
- memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
- if (my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
- {
- my_free(lookup_gtid);
- return 1;
- }
- elem->last_gtid= lookup_gtid;
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+}
+
+
+/* Helper functions for update. */
+int
+rpl_binlog_state::element::update_element(const rpl_gtid *gtid)
+{
+ rpl_gtid *lookup_gtid;
+
+ /*
+ By far the most common case is that successive events within same
+ replication domain have the same server id (it changes only when
+ switching to a new master). So save a hash lookup in this case.
+ */
+ if (likely(last_gtid && last_gtid->server_id == gtid->server_id))
+ {
+ last_gtid->seq_no= gtid->seq_no;
return 0;
}
+ lookup_gtid= (rpl_gtid *)
+ my_hash_search(&hash, (const uchar *)&gtid->server_id, 0);
+ if (lookup_gtid)
+ {
+ lookup_gtid->seq_no= gtid->seq_no;
+ last_gtid= lookup_gtid;
+ return 0;
+ }
+
+ /* Allocate a new GTID and insert it. */
+ lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
+ if (!lookup_gtid)
+ return 1;
+ memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
+ if (my_hash_insert(&hash, (const uchar *)lookup_gtid))
+ {
+ my_free(lookup_gtid);
+ return 1;
+ }
+ last_gtid= lookup_gtid;
+ return 0;
+}
+
+
+int
+rpl_binlog_state::alloc_element(const rpl_gtid *gtid)
+{
+ element *elem;
+ rpl_gtid *lookup_gtid;
+
/* First time we see this domain_id; allocate a new element. */
elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME));
lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
@@ -768,6 +859,7 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid)
offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
HASH_UNIQUE);
elem->last_gtid= lookup_gtid;
+ elem->seq_no_counter= gtid->seq_no;
memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
{
@@ -787,23 +879,64 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid)
}
-uint64
-rpl_binlog_state::seq_no_from_state()
+/*
+ Check that a new GTID can be logged without creating an out-of-order
+ sequence number with existing GTIDs.
+*/
+bool
+rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id,
+ uint64 seq_no)
{
- ulong i, j;
- uint64 seq_no= 0;
+ element *elem;
- for (i= 0; i < hash.records; ++i)
+ if ((elem= (element *)my_hash_search(&hash,
+ (const uchar *)(&domain_id), 0)) &&
+ elem->last_gtid && elem->last_gtid->seq_no >= seq_no)
{
- element *e= (element *)my_hash_element(&hash, i);
- for (j= 0; j < e->hash.records; ++j)
- {
- const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
- if (gtid->seq_no > seq_no)
- seq_no= gtid->seq_no;
- }
+ my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), domain_id, server_id, seq_no,
+ elem->last_gtid->domain_id, elem->last_gtid->server_id,
+ elem->last_gtid->seq_no);
+ return 1;
}
- return seq_no;
+ return 0;
+}
+
+
+/*
+ When we see a new GTID that will not be binlogged (eg. slave thread
+ with --log-slave-updates=0), then we need to remember to allocate any
+ GTID seq_no of our own within that domain starting from there.
+
+ Returns 0 if ok, non-zero if out-of-memory.
+*/
+int
+rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no)
+{
+ element *elem;
+
+ if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0)))
+ {
+ if (elem->seq_no_counter < seq_no)
+ elem->seq_no_counter= seq_no;
+ return 0;
+ }
+
+ /* We need to allocate a new, empty element to remember the next seq_no. */
+ if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
+ return 1;
+
+ elem->domain_id= domain_id;
+ my_hash_init(&elem->hash, &my_charset_bin, 32,
+ offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
+ HASH_UNIQUE);
+ elem->last_gtid= NULL;
+ elem->seq_no_counter= seq_no;
+ if (0 == my_hash_insert(&hash, (const uchar *)elem))
+ return 0;
+
+ my_hash_free(&elem->hash);
+ my_free(elem);
+ return 1;
}
@@ -824,6 +957,11 @@ rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
{
size_t res;
element *e= (element *)my_hash_element(&hash, i);
+ if (!e->last_gtid)
+ {
+ DBUG_ASSERT(e->hash.records == 0);
+ continue;
+ }
for (j= 0; j <= e->hash.records; ++j)
{
const rpl_gtid *gtid;
@@ -865,7 +1003,7 @@ rpl_binlog_state::read_from_iocache(IO_CACHE *src)
end= buf + res;
if (gtid_parser_helper(&p, end, &gtid))
return 1;
- if (update(&gtid))
+ if (update(&gtid, false))
return 1;
}
return 0;
@@ -881,6 +1019,17 @@ rpl_binlog_state::find(uint32 domain_id, uint32 server_id)
return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, 0);
}
+rpl_gtid *
+rpl_binlog_state::find_most_recent(uint32 domain_id)
+{
+ element *elem;
+
+ elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
+ if (elem && elem->last_gtid)
+ return elem->last_gtid;
+ return NULL;
+}
+
uint32
rpl_binlog_state::count()
@@ -904,6 +1053,11 @@ rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
+ if (!e->last_gtid)
+ {
+ DBUG_ASSERT(e->hash.records==0);
+ continue;
+ }
for (j= 0; j <= e->hash.records; ++j)
{
const rpl_gtid *gtid;
@@ -940,20 +1094,44 @@ int
rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
{
uint32 i;
+ uint32 alloc_size, out_size;
- *size= hash.records;
- if (!(*list= (rpl_gtid *)my_malloc(*size * sizeof(rpl_gtid), MYF(MY_WME))))
+ alloc_size= hash.records;
+ if (!(*list= (rpl_gtid *)my_malloc(alloc_size * sizeof(rpl_gtid),
+ MYF(MY_WME))))
return 1;
- for (i= 0; i < *size; ++i)
+ out_size= 0;
+ for (i= 0; i < alloc_size; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
- memcpy(&((*list)[i]), e->last_gtid, sizeof(rpl_gtid));
+ if (!e->last_gtid)
+ continue;
+ memcpy(&((*list)[out_size++]), e->last_gtid, sizeof(rpl_gtid));
}
+ *size= out_size;
return 0;
}
+bool
+rpl_binlog_state::append_pos(String *str)
+{
+ uint32 i;
+ bool first= true;
+
+ for (i= 0; i < hash.records; ++i)
+ {
+ element *e= (element *)my_hash_element(&hash, i);
+ if (e->last_gtid &&
+ rpl_slave_state_tostring_helper(str, e->last_gtid, &first))
+ return true;
+ }
+
+ return false;
+}
+
+
slave_connection_state::slave_connection_state()
{
my_hash_init(&hash, &my_charset_bin, 32,
@@ -1107,10 +1285,17 @@ slave_connection_state::remove(const rpl_gtid *in_gtid)
int
slave_connection_state::to_string(String *out_str)
{
+ out_str->length(0);
+ return append_to_string(out_str);
+}
+
+
+int
+slave_connection_state::append_to_string(String *out_str)
+{
uint32 i;
bool first;
- out_str->length(0);
first= true;
for (i= 0; i < hash.records; ++i)
{