diff options
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r-- | sql/rpl_gtid.cc | 313 |
1 files changed, 249 insertions, 64 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index d6a6ed90bd3..b34b890060b 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -29,7 +29,7 @@ const LEX_STRING rpl_gtid_slave_state_table_name= - { C_STRING_WITH_LEN("rpl_slave_state") }; + { C_STRING_WITH_LEN("gtid_slave_pos") }; void @@ -73,7 +73,7 @@ rpl_slave_state::record_and_update_gtid(THD *thd, Relay_log_info *rli) if ((sub_id= rli->gtid_sub_id)) { rli->gtid_sub_id= 0; - if (record_gtid(thd, &rli->current_gtid, sub_id, false)) + if (record_gtid(thd, &rli->current_gtid, sub_id, false, false)) return 1; update_state_hash(sub_id, &rli->current_gtid); } @@ -186,8 +186,6 @@ rpl_slave_state::truncate_state_table(THD *thd) int err= 0; TABLE *table; - mysql_reset_thd_for_next_command(thd, 0); - tlist.init_one_table(STRING_WITH_LEN("mysql"), rpl_gtid_slave_state_table_name.str, rpl_gtid_slave_state_table_name.length, @@ -234,7 +232,7 @@ static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= { static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1}; -static const TABLE_FIELD_DEF mysql_rpl_slave_state_tabledef= { +static const TABLE_FIELD_DEF mysql_gtid_slave_pos_tabledef= { array_elements(mysql_rpl_slave_state_coltypes), mysql_rpl_slave_state_coltypes, array_elements(mysql_rpl_slave_state_pk_parts), @@ -256,14 +254,14 @@ protected: static Gtid_db_intact gtid_table_intact; /* - Check that the mysql.rpl_slave_state table has the correct definition. + Check that the mysql.gtid_slave_pos table has the correct definition. */ int gtid_check_rpl_slave_state_table(TABLE *table) { int err; - if ((err= gtid_table_intact.check(table, &mysql_rpl_slave_state_tabledef))) + if ((err= gtid_table_intact.check(table, &mysql_gtid_slave_pos_tabledef))) my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql", rpl_gtid_slave_state_table_name.str); return err; @@ -286,7 +284,7 @@ gtid_check_rpl_slave_state_table(TABLE *table) */ int rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, - bool in_transaction) + bool in_transaction, bool in_statement) { TABLE_LIST tlist; int err= 0; @@ -297,7 +295,8 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, ulonglong thd_saved_option= thd->variables.option_bits; Query_tables_list lex_backup; - mysql_reset_thd_for_next_command(thd, 0); + if (!in_statement) + mysql_reset_thd_for_next_command(thd, 0); DBUG_EXECUTE_IF("gtid_inject_record_gtid", { @@ -371,7 +370,10 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, } table->file->ha_index_end(); - mysql_bin_log.bump_seq_no_counter_if_needed(gtid->seq_no); + if(!err && opt_bin_log && + (err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id, + gtid->seq_no))) + my_error(ER_OUT_OF_RESOURCES, MYF(0)); end: @@ -626,7 +628,7 @@ gtid_parser_helper(char **ptr, char *end, rpl_gtid *out_gtid) */ int rpl_slave_state::load(THD *thd, char *state_from_master, size_t len, - bool reset) + bool reset, bool in_statement) { char *end= state_from_master + len; @@ -645,7 +647,7 @@ rpl_slave_state::load(THD *thd, char *state_from_master, size_t len, if (gtid_parser_helper(&state_from_master, end, >id) || !(sub_id= next_subid(gtid.domain_id)) || - record_gtid(thd, >id, sub_id, false) || + record_gtid(thd, >id, sub_id, false, in_statement) || update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no)) return 1; if (state_from_master == end) @@ -686,6 +688,7 @@ rpl_binlog_state::rpl_binlog_state() sizeof(uint32), NULL, my_free, HASH_UNIQUE); mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state, MY_MUTEX_INIT_SLOW); + initialized= 1; } @@ -699,11 +702,36 @@ rpl_binlog_state::reset() my_hash_reset(&hash); } -rpl_binlog_state::~rpl_binlog_state() +void rpl_binlog_state::free() +{ + if (initialized) + { + initialized= 0; + reset(); + my_hash_free(&hash); + mysql_mutex_destroy(&LOCK_binlog_state); + } +} + + +bool +rpl_binlog_state::load(struct rpl_gtid *list, uint32 count) { + uint32 i; + reset(); - my_hash_free(&hash); - mysql_mutex_destroy(&LOCK_binlog_state); + for (i= 0; i < count; ++i) + { + if (update(&(list[i]), false)) + return true; + } + return false; +} + + +rpl_binlog_state::~rpl_binlog_state() +{ + free(); } @@ -716,48 +744,111 @@ rpl_binlog_state::~rpl_binlog_state() Returns 0 for ok, 1 for error. */ int -rpl_binlog_state::update(const struct rpl_gtid *gtid) +rpl_binlog_state::update(const struct rpl_gtid *gtid, bool strict) { - rpl_gtid *lookup_gtid; element *elem; - elem= (element *)my_hash_search(&hash, (const uchar *)(>id->domain_id), 0); - if (elem) + if ((elem= (element *)my_hash_search(&hash, + (const uchar *)(>id->domain_id), 0))) { - /* - By far the most common case is that successive events within same - replication domain have the same server id (it changes only when - switching to a new master). So save a hash lookup in this case. - */ - if (likely(elem->last_gtid->server_id == gtid->server_id)) + if (strict && elem->last_gtid && elem->last_gtid->seq_no >= gtid->seq_no) { - elem->last_gtid->seq_no= gtid->seq_no; - return 0; + my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), gtid->domain_id, + gtid->server_id, gtid->seq_no, elem->last_gtid->domain_id, + elem->last_gtid->server_id, elem->last_gtid->seq_no); + return 1; } + if (elem->seq_no_counter < gtid->seq_no) + elem->seq_no_counter= gtid->seq_no; + if (!elem->update_element(gtid)) + return 0; + } + else if (!alloc_element(gtid)) + return 0; - lookup_gtid= (rpl_gtid *) - my_hash_search(&elem->hash, (const uchar *)>id->server_id, 0); - if (lookup_gtid) - { - lookup_gtid->seq_no= gtid->seq_no; - elem->last_gtid= lookup_gtid; + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; +} + + +/* + Fill in a new GTID, allocating next sequence number, and update state + accordingly. +*/ +int +rpl_binlog_state::update_with_next_gtid(uint32 domain_id, uint32 server_id, + rpl_gtid *gtid) +{ + element *elem; + + gtid->domain_id= domain_id; + gtid->server_id= server_id; + + if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0))) + { + gtid->seq_no= ++elem->seq_no_counter; + if (!elem->update_element(gtid)) return 0; - } + } + else + { + gtid->seq_no= 1; + if (!alloc_element(gtid)) + return 0; + } - /* Allocate a new GTID and insert it. */ - lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME)); - if (!lookup_gtid) - return 1; - memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); - if (my_hash_insert(&elem->hash, (const uchar *)lookup_gtid)) - { - my_free(lookup_gtid); - return 1; - } - elem->last_gtid= lookup_gtid; + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; +} + + +/* Helper functions for update. */ +int +rpl_binlog_state::element::update_element(const rpl_gtid *gtid) +{ + rpl_gtid *lookup_gtid; + + /* + By far the most common case is that successive events within same + replication domain have the same server id (it changes only when + switching to a new master). So save a hash lookup in this case. + */ + if (likely(last_gtid && last_gtid->server_id == gtid->server_id)) + { + last_gtid->seq_no= gtid->seq_no; return 0; } + lookup_gtid= (rpl_gtid *) + my_hash_search(&hash, (const uchar *)>id->server_id, 0); + if (lookup_gtid) + { + lookup_gtid->seq_no= gtid->seq_no; + last_gtid= lookup_gtid; + return 0; + } + + /* Allocate a new GTID and insert it. */ + lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME)); + if (!lookup_gtid) + return 1; + memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); + if (my_hash_insert(&hash, (const uchar *)lookup_gtid)) + { + my_free(lookup_gtid); + return 1; + } + last_gtid= lookup_gtid; + return 0; +} + + +int +rpl_binlog_state::alloc_element(const rpl_gtid *gtid) +{ + element *elem; + rpl_gtid *lookup_gtid; + /* First time we see this domain_id; allocate a new element. */ elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME)); lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME)); @@ -768,6 +859,7 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid) offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free, HASH_UNIQUE); elem->last_gtid= lookup_gtid; + elem->seq_no_counter= gtid->seq_no; memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid)) { @@ -787,23 +879,64 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid) } -uint64 -rpl_binlog_state::seq_no_from_state() +/* + Check that a new GTID can be logged without creating an out-of-order + sequence number with existing GTIDs. +*/ +bool +rpl_binlog_state::check_strict_sequence(uint32 domain_id, uint32 server_id, + uint64 seq_no) { - ulong i, j; - uint64 seq_no= 0; + element *elem; - for (i= 0; i < hash.records; ++i) + if ((elem= (element *)my_hash_search(&hash, + (const uchar *)(&domain_id), 0)) && + elem->last_gtid && elem->last_gtid->seq_no >= seq_no) { - element *e= (element *)my_hash_element(&hash, i); - for (j= 0; j < e->hash.records; ++j) - { - const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&e->hash, j); - if (gtid->seq_no > seq_no) - seq_no= gtid->seq_no; - } + my_error(ER_GTID_STRICT_OUT_OF_ORDER, MYF(0), domain_id, server_id, seq_no, + elem->last_gtid->domain_id, elem->last_gtid->server_id, + elem->last_gtid->seq_no); + return 1; } - return seq_no; + return 0; +} + + +/* + When we see a new GTID that will not be binlogged (eg. slave thread + with --log-slave-updates=0), then we need to remember to allocate any + GTID seq_no of our own within that domain starting from there. + + Returns 0 if ok, non-zero if out-of-memory. +*/ +int +rpl_binlog_state::bump_seq_no_if_needed(uint32 domain_id, uint64 seq_no) +{ + element *elem; + + if ((elem= (element *)my_hash_search(&hash, (const uchar *)(&domain_id), 0))) + { + if (elem->seq_no_counter < seq_no) + elem->seq_no_counter= seq_no; + return 0; + } + + /* We need to allocate a new, empty element to remember the next seq_no. */ + if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME)))) + return 1; + + elem->domain_id= domain_id; + my_hash_init(&elem->hash, &my_charset_bin, 32, + offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free, + HASH_UNIQUE); + elem->last_gtid= NULL; + elem->seq_no_counter= seq_no; + if (0 == my_hash_insert(&hash, (const uchar *)elem)) + return 0; + + my_hash_free(&elem->hash); + my_free(elem); + return 1; } @@ -824,6 +957,11 @@ rpl_binlog_state::write_to_iocache(IO_CACHE *dest) { size_t res; element *e= (element *)my_hash_element(&hash, i); + if (!e->last_gtid) + { + DBUG_ASSERT(e->hash.records == 0); + continue; + } for (j= 0; j <= e->hash.records; ++j) { const rpl_gtid *gtid; @@ -865,7 +1003,7 @@ rpl_binlog_state::read_from_iocache(IO_CACHE *src) end= buf + res; if (gtid_parser_helper(&p, end, >id)) return 1; - if (update(>id)) + if (update(>id, false)) return 1; } return 0; @@ -881,6 +1019,17 @@ rpl_binlog_state::find(uint32 domain_id, uint32 server_id) return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, 0); } +rpl_gtid * +rpl_binlog_state::find_most_recent(uint32 domain_id) +{ + element *elem; + + elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0); + if (elem && elem->last_gtid) + return elem->last_gtid; + return NULL; +} + uint32 rpl_binlog_state::count() @@ -904,6 +1053,11 @@ rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size) for (i= 0; i < hash.records; ++i) { element *e= (element *)my_hash_element(&hash, i); + if (!e->last_gtid) + { + DBUG_ASSERT(e->hash.records==0); + continue; + } for (j= 0; j <= e->hash.records; ++j) { const rpl_gtid *gtid; @@ -940,20 +1094,44 @@ int rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size) { uint32 i; + uint32 alloc_size, out_size; - *size= hash.records; - if (!(*list= (rpl_gtid *)my_malloc(*size * sizeof(rpl_gtid), MYF(MY_WME)))) + alloc_size= hash.records; + if (!(*list= (rpl_gtid *)my_malloc(alloc_size * sizeof(rpl_gtid), + MYF(MY_WME)))) return 1; - for (i= 0; i < *size; ++i) + out_size= 0; + for (i= 0; i < alloc_size; ++i) { element *e= (element *)my_hash_element(&hash, i); - memcpy(&((*list)[i]), e->last_gtid, sizeof(rpl_gtid)); + if (!e->last_gtid) + continue; + memcpy(&((*list)[out_size++]), e->last_gtid, sizeof(rpl_gtid)); } + *size= out_size; return 0; } +bool +rpl_binlog_state::append_pos(String *str) +{ + uint32 i; + bool first= true; + + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + if (e->last_gtid && + rpl_slave_state_tostring_helper(str, e->last_gtid, &first)) + return true; + } + + return false; +} + + slave_connection_state::slave_connection_state() { my_hash_init(&hash, &my_charset_bin, 32, @@ -1107,10 +1285,17 @@ slave_connection_state::remove(const rpl_gtid *in_gtid) int slave_connection_state::to_string(String *out_str) { + out_str->length(0); + return append_to_string(out_str); +} + + +int +slave_connection_state::append_to_string(String *out_str) +{ uint32 i; bool first; - out_str->length(0); first= true; for (i= 0; i < hash.records; ++i) { |