diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-03-11 16:02:40 +0100 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-03-11 16:02:40 +0100 |
commit | 5e414f6b06a88477a25407ddb19ec3c9082aa5d5 (patch) | |
tree | fb32b7443c1c215cbc51a5ebc6177640e5ba7be4 | |
parent | 320863530f2038fc8783d30d43a02452c21a186d (diff) | |
download | mariadb-git-5e414f6b06a88477a25407ddb19ec3c9082aa5d5.tar.gz |
MDEV-26: Global Transaction ID.
Move a bunch of GTID specific code into new file rpl_gtid.cc. Make libmysqld build.
-rw-r--r-- | libmysqld/CMakeLists.txt | 1 | ||||
-rw-r--r-- | sql/CMakeLists.txt | 1 | ||||
-rw-r--r-- | sql/item_strfunc.cc | 6 | ||||
-rw-r--r-- | sql/log_event.cc | 1011 | ||||
-rw-r--r-- | sql/log_event.h | 145 | ||||
-rw-r--r-- | sql/rpl_gtid.cc | 1035 | ||||
-rw-r--r-- | sql/rpl_gtid.h | 170 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 3 | ||||
-rw-r--r-- | sql/rpl_rli.h | 1 |
9 files changed, 1215 insertions, 1158 deletions
diff --git a/libmysqld/CMakeLists.txt b/libmysqld/CMakeLists.txt index 0f08ecd76bb..07a0551b4ab 100644 --- a/libmysqld/CMakeLists.txt +++ b/libmysqld/CMakeLists.txt @@ -98,6 +98,7 @@ SET(SQL_EMBEDDED_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc ../sql/rpl_reporting.cc ../sql/sql_expression_cache.cc ../sql/my_apc.cc ../sql/my_apc.h + ../sql/rpl_gtid.cc ${GEN_SOURCES} ${MYSYS_LIBWRAP_SOURCE} ) diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 4aa7b8f882e..c6489a45d77 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -88,6 +88,7 @@ SET (SQL_SOURCE threadpool_common.cc ../sql-common/mysql_async.c my_apc.cc my_apc.h + rpl_gtid.cc ${GEN_SOURCES} ${MYSYS_LIBWRAP_SOURCE} ) diff --git a/sql/item_strfunc.cc b/sql/item_strfunc.cc index 79ed795fe3e..3f187ec2af7 100644 --- a/sql/item_strfunc.cc +++ b/sql/item_strfunc.cc @@ -2737,6 +2737,11 @@ void Item_func_binlog_gtid_pos::fix_length_and_dec() String *Item_func_binlog_gtid_pos::val_str(String *str) { DBUG_ASSERT(fixed == 1); +#ifndef HAVE_REPLICATION + null_value= 0; + str->copy("", 0, system_charset_info); + return str; +#else String name_str, *name; longlong pos; @@ -2757,6 +2762,7 @@ String *Item_func_binlog_gtid_pos::val_str(String *str) err: null_value= 1; return NULL; +#endif /* !HAVE_REPLICATION */ } diff --git a/sql/log_event.cc b/sql/log_event.cc index 4431afa11e1..1309907e91f 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -3726,54 +3726,6 @@ bool test_if_equal_repl_errors(int expected_error, int actual_error) } -void -rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid) -{ - int err; - /* - Add the gtid to the HASH in the replication slave state. - - We must do this only _after_ commit, so that for parallel replication, - there will not be an attempt to delete the corresponding table row before - it is even committed. - */ - lock(); - err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no); - unlock(); - if (err) - { - sql_print_warning("Slave: Out of memory during slave state maintenance. " - "Some no longer necessary rows in table " - "mysql.rpl_slave_state may be left undeleted."); - /* - Such failure is not fatal. We will fail to delete the row for this - GTID, but it will do no harm and will be removed automatically on next - server restart. - */ - } -} - - -int -rpl_slave_state::record_and_update_gtid(THD *thd, Relay_log_info *rli) -{ - uint64 sub_id; - - /* - Update the GTID position, if we have it and did not already update - it in a GTID transaction. - */ - if ((sub_id= rli->gtid_sub_id)) - { - rli->gtid_sub_id= 0; - if (record_gtid(thd, &rli->current_gtid, sub_id, false)) - return 1; - update_state_hash(sub_id, &rli->current_gtid); - } - return 0; -} - - /** @todo Compare the values of "affected rows" around here. Something @@ -6142,897 +6094,6 @@ bool Binlog_checkpoint_log_event::write(IO_CACHE *file) Global transaction ID stuff **************************************************************************/ -rpl_slave_state::rpl_slave_state() - : inited(false), loaded(false) -{ - my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id), - sizeof(uint32), NULL, my_free, HASH_UNIQUE); -} - - -rpl_slave_state::~rpl_slave_state() -{ -} - -#ifdef MYSQL_SERVER -void -rpl_slave_state::init() -{ - DBUG_ASSERT(!inited); - mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state, MY_MUTEX_INIT_SLOW); - inited= true; -} - - -void -rpl_slave_state::truncate_hash() -{ - uint32 i; - - for (i= 0; i < hash.records; ++i) - { - element *e= (element *)my_hash_element(&hash, i); - list_element *l= e->list; - list_element *next; - while (l) - { - next= l->next; - my_free(l); - l= next; - } - /* The element itself is freed by the hash element free function. */ - } - my_hash_reset(&hash); -} - -void -rpl_slave_state::deinit() -{ - if (!inited) - return; - truncate_hash(); - my_hash_free(&hash); - mysql_mutex_destroy(&LOCK_slave_state); -} -#endif - - -int -rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, - uint64 seq_no) -{ - element *elem= NULL; - list_element *list_elem= NULL; - - if (!(elem= get_element(domain_id))) - return 1; - - if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME)))) - return 1; - list_elem->server_id= server_id; - list_elem->sub_id= sub_id; - list_elem->seq_no= seq_no; - - elem->add(list_elem); - return 0; -} - - -struct rpl_slave_state::element * -rpl_slave_state::get_element(uint32 domain_id) -{ - struct element *elem; - - elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0); - if (elem) - return elem; - - if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME)))) - return NULL; - elem->list= NULL; - elem->last_sub_id= 0; - elem->domain_id= domain_id; - if (my_hash_insert(&hash, (uchar *)elem)) - { - my_free(elem); - return NULL; - } - return elem; -} - - -#ifdef MYSQL_SERVER -#ifdef HAVE_REPLICATION -int -rpl_slave_state::truncate_state_table(THD *thd) -{ - TABLE_LIST tlist; - int err= 0; - TABLE *table; - - mysql_reset_thd_for_next_command(thd, 0); - - tlist.init_one_table(STRING_WITH_LEN("mysql"), - rpl_gtid_slave_state_table_name.str, - rpl_gtid_slave_state_table_name.length, - NULL, TL_WRITE); - if (!(err= open_and_lock_tables(thd, &tlist, FALSE, 0))) - { - table= tlist.table; - table->no_replicate= 1; - err= table->file->ha_truncate(); - - if (err) - { - ha_rollback_trans(thd, FALSE); - close_thread_tables(thd); - ha_rollback_trans(thd, TRUE); - } - else - { - ha_commit_trans(thd, FALSE); - close_thread_tables(thd); - ha_commit_trans(thd, TRUE); - } - } - - return err; -} - - -static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= { - { { C_STRING_WITH_LEN("domain_id") }, - { C_STRING_WITH_LEN("int(10) unsigned") }, - {NULL, 0} }, - { { C_STRING_WITH_LEN("sub_id") }, - { C_STRING_WITH_LEN("bigint(20) unsigned") }, - {NULL, 0} }, - { { C_STRING_WITH_LEN("server_id") }, - { C_STRING_WITH_LEN("int(10) unsigned") }, - {NULL, 0} }, - { { C_STRING_WITH_LEN("seq_no") }, - { C_STRING_WITH_LEN("bigint(20) unsigned") }, - {NULL, 0} }, -}; - -static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1}; - -static const TABLE_FIELD_DEF mysql_rpl_slave_state_tabledef= { - array_elements(mysql_rpl_slave_state_coltypes), - mysql_rpl_slave_state_coltypes, - array_elements(mysql_rpl_slave_state_pk_parts), - mysql_rpl_slave_state_pk_parts -}; - -class Gtid_db_intact : public Table_check_intact -{ -protected: - void report_error(uint, const char *fmt, ...) - { - va_list args; - va_start(args, fmt); - error_log_print(ERROR_LEVEL, fmt, args); - va_end(args); - } -}; - -static Gtid_db_intact gtid_table_intact; - -/* - Write a gtid to the replication slave state table. - - Do it as part of the transaction, to get slave crash safety, or as a separate - transaction if !in_transaction (eg. MyISAM or DDL). - - gtid The global transaction id for this event group. - sub_id Value allocated within the sub_id when the event group was - read (sub_id must be consistent with commit order in master binlog). - - Note that caller must later ensure that the new gtid and sub_id is inserted - into the appropriate HASH element with rpl_slave_state.add(), so that it can - be deleted later. But this must only be done after COMMIT if in transaction. -*/ -int -rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, - bool in_transaction) -{ - TABLE_LIST tlist; - int err= 0; - bool table_opened= false; - TABLE *table; - list_element *elist= 0, *next; - element *elem; - ulonglong thd_saved_option= thd->variables.option_bits; - - mysql_reset_thd_for_next_command(thd, 0); - - tlist.init_one_table(STRING_WITH_LEN("mysql"), - rpl_gtid_slave_state_table_name.str, - rpl_gtid_slave_state_table_name.length, - NULL, TL_WRITE); - if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) - goto end; - table_opened= true; - table= tlist.table; - - if ((err= gtid_table_intact.check(table, &mysql_rpl_slave_state_tabledef))) - { - my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql", - rpl_gtid_slave_state_table_name.str); - goto end; - } - - table->no_replicate= 1; - if (!in_transaction) - thd->variables.option_bits&= - ~(ulonglong)(OPTION_NOT_AUTOCOMMIT|OPTION_BEGIN); - - /* - ToDo: Check the table definition, error if not as expected. - We need the correct first 4 columns with correct type, and the primary key. - */ - bitmap_set_all(table->write_set); - - table->field[0]->store((ulonglong)gtid->domain_id, true); - table->field[1]->store(sub_id, true); - table->field[2]->store((ulonglong)gtid->server_id, true); - table->field[3]->store(gtid->seq_no, true); - if ((err= table->file->ha_write_row(table->record[0]))) - goto end; - - lock(); - if ((elem= get_element(gtid->domain_id)) == NULL) - { - unlock(); - err= 1; - goto end; - } - elist= elem->grab_list(); - unlock(); - - if (!elist) - goto end; - - /* Now delete any already committed rows. */ - bitmap_set_bit(table->read_set, table->field[0]->field_index); - bitmap_set_bit(table->read_set, table->field[1]->field_index); - - if ((err= table->file->ha_index_init(0, 0))) - goto end; - while (elist) - { - uchar key_buffer[4+8]; - - next= elist->next; - - table->field[1]->store(elist->sub_id, true); - /* domain_id is already set in table->record[0] from write_row() above. */ - key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false); - if ((err= table->file->ha_index_read_map(table->record[1], key_buffer, - HA_WHOLE_KEY, HA_READ_KEY_EXACT)) || - (err= table->file->ha_delete_row(table->record[1]))) - break; - my_free(elist); - elist= next; - } - table->file->ha_index_end(); - -end: - - if (table_opened) - { - if (err) - { - /* - ToDo: If error, we need to put any remaining elist back into the HASH so - we can do another delete attempt later. - */ - ha_rollback_trans(thd, FALSE); - close_thread_tables(thd); - } - else - { - ha_commit_trans(thd, FALSE); - close_thread_tables(thd); - } - } - thd->variables.option_bits= thd_saved_option; - return err; -} - - -uint64 -rpl_slave_state::next_subid(uint32 domain_id) -{ - uint32 sub_id= 0; - element *elem; - - lock(); - elem= get_element(domain_id); - if (elem) - sub_id= ++elem->last_sub_id; - unlock(); - - return sub_id; -} -#endif - - -static -bool -rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first) -{ - if (*first) - *first= false; - else - if (dest->append(",",1)) - return true; - return - dest->append_ulonglong(gtid->domain_id) || - dest->append("-",1) || - dest->append_ulonglong(gtid->server_id) || - dest->append("-",1) || - dest->append_ulonglong(gtid->seq_no); -} - - -/* - Prepare the current slave state as a string, suitable for sending to the - master to request to receive binlog events starting from that GTID state. - - The state consists of the most recently applied GTID for each domain_id, - ie. the one with the highest sub_id within each domain_id. -*/ - -int -rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra) -{ - bool first= true; - uint32 i; - HASH gtid_hash; - uchar *rec; - rpl_gtid *gtid; - int res= 1; - - my_hash_init(>id_hash, &my_charset_bin, 32, offsetof(rpl_gtid, domain_id), - sizeof(uint32), NULL, NULL, HASH_UNIQUE); - for (i= 0; i < num_extra; ++i) - if (my_hash_insert(>id_hash, (uchar *)(&extra_gtids[i]))) - goto err; - - lock(); - - for (i= 0; i < hash.records; ++i) - { - uint64 best_sub_id; - rpl_gtid best_gtid; - element *e= (element *)my_hash_element(&hash, i); - list_element *l= e->list; - - if (!l) - continue; /* Nothing here */ - - best_gtid.domain_id= e->domain_id; - best_gtid.server_id= l->server_id; - best_gtid.seq_no= l->seq_no; - best_sub_id= l->sub_id; - while ((l= l->next)) - { - if (l->sub_id > best_sub_id) - { - best_sub_id= l->sub_id; - best_gtid.server_id= l->server_id; - best_gtid.seq_no= l->seq_no; - } - } - - /* Check if we have something newer in the extra list. */ - rec= my_hash_search(>id_hash, (const uchar *)&best_gtid.domain_id, 0); - if (rec) - { - gtid= (rpl_gtid *)rec; - if (gtid->seq_no > best_gtid.seq_no) - memcpy(&best_gtid, gtid, sizeof(best_gtid)); - if (my_hash_delete(>id_hash, rec)) - { - unlock(); - goto err; - } - } - - if (rpl_slave_state_tostring_helper(dest, &best_gtid, &first)) - { - unlock(); - goto err; - } - } - - unlock(); - - /* Also add any remaining extra domain_ids. */ - for (i= 0; i < gtid_hash.records; ++i) - { - gtid= (rpl_gtid *)my_hash_element(>id_hash, i); - if (rpl_slave_state_tostring_helper(dest, gtid, &first)) - goto err; - } - - res= 0; - -err: - my_hash_free(>id_hash); - - return res; -} - - -/* - Parse a GTID at the start of a string, and update the pointer to point - at the first character after the parsed GTID. - - GTID can be in short form with domain_id=0 implied, SERVERID-SEQNO. - Or long form, DOMAINID-SERVERID-SEQNO. - - Returns 0 on ok, non-zero on parse error. -*/ -static int -gtid_parser_helper(char **ptr, char *end, rpl_gtid *out_gtid) -{ - char *q; - char *p= *ptr; - uint64 v1, v2, v3; - int err= 0; - - q= end; - v1= (uint64)my_strtoll10(p, &q, &err); - if (err != 0 || v1 > (uint32)0xffffffff || q == end || *q != '-') - return 1; - p= q+1; - q= end; - v2= (uint64)my_strtoll10(p, &q, &err); - if (err != 0 || v2 > (uint32)0xffffffff || q == end || *q != '-') - return 1; - p= q+1; - q= end; - v3= (uint64)my_strtoll10(p, &q, &err); - if (err != 0) - return 1; - - out_gtid->domain_id= v1; - out_gtid->server_id= v2; - out_gtid->seq_no= v3; - *ptr= q; - return 0; -} - - -/* - Update the slave replication state with the GTID position obtained from - master when connecting with old-style (filename,offset) position. - - If RESET is true then all existing entries are removed. Otherwise only - domain_ids mentioned in the STATE_FROM_MASTER are changed. - - Returns 0 if ok, non-zero if error. -*/ -int -rpl_slave_state::load(THD *thd, char *state_from_master, size_t len, - bool reset) -{ - char *end= state_from_master + len; - - if (reset) - { - if (truncate_state_table(thd)) - return 1; - truncate_hash(); - } - if (state_from_master == end) - return 0; - for (;;) - { - rpl_gtid gtid; - uint64 sub_id; - - if (gtid_parser_helper(&state_from_master, end, >id) || - !(sub_id= next_subid(gtid.domain_id)) || - record_gtid(thd, >id, sub_id, false) || - update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no)) - return 1; - if (state_from_master == end) - break; - if (*state_from_master != ',') - return 1; - ++state_from_master; - } - return 0; -} - - -bool -rpl_slave_state::is_empty() -{ - uint32 i; - bool result= true; - - lock(); - for (i= 0; i < hash.records; ++i) - { - element *e= (element *)my_hash_element(&hash, i); - if (e->list) - { - result= false; - break; - } - } - unlock(); - - return result; -} - - -rpl_binlog_state::rpl_binlog_state() -{ - my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id), - sizeof(uint32), NULL, my_free, HASH_UNIQUE); - mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state, - MY_MUTEX_INIT_SLOW); -} - - -void -rpl_binlog_state::reset() -{ - uint32 i; - - for (i= 0; i < hash.records; ++i) - my_hash_free(&((element *)my_hash_element(&hash, i))->hash); - my_hash_reset(&hash); -} - -rpl_binlog_state::~rpl_binlog_state() -{ - reset(); - my_hash_free(&hash); - mysql_mutex_destroy(&LOCK_binlog_state); -} - - -/* - Update replication state with a new GTID. - - If the (domain_id, server_id) pair already exists, then the new GTID replaces - the old one for that domain id. Else a new entry is inserted. - - Returns 0 for ok, 1 for error. -*/ -int -rpl_binlog_state::update(const struct rpl_gtid *gtid) -{ - rpl_gtid *lookup_gtid; - element *elem; - - elem= (element *)my_hash_search(&hash, (const uchar *)(>id->domain_id), 0); - if (elem) - { - /* - By far the most common case is that successive events within same - replication domain have the same server id (it changes only when - switching to a new master). So save a hash lookup in this case. - */ - if (likely(elem->last_gtid->server_id == gtid->server_id)) - { - elem->last_gtid->seq_no= gtid->seq_no; - return 0; - } - - lookup_gtid= (rpl_gtid *) - my_hash_search(&elem->hash, (const uchar *)>id->server_id, 0); - if (lookup_gtid) - { - lookup_gtid->seq_no= gtid->seq_no; - elem->last_gtid= lookup_gtid; - return 0; - } - - /* Allocate a new GTID and insert it. */ - lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME)); - if (!lookup_gtid) - return 1; - memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); - if (my_hash_insert(&elem->hash, (const uchar *)lookup_gtid)) - { - my_free(lookup_gtid); - return 1; - } - elem->last_gtid= lookup_gtid; - return 0; - } - - /* First time we see this domain_id; allocate a new element. */ - elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME)); - lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME)); - if (elem && lookup_gtid) - { - elem->domain_id= gtid->domain_id; - my_hash_init(&elem->hash, &my_charset_bin, 32, - offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free, - HASH_UNIQUE); - elem->last_gtid= lookup_gtid; - memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); - if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid)) - { - lookup_gtid= NULL; /* Do not free. */ - if (0 == my_hash_insert(&hash, (const uchar *)elem)) - return 0; - } - my_hash_free(&elem->hash); - } - - /* An error. */ - if (elem) - my_free(elem); - if (lookup_gtid) - my_free(lookup_gtid); - return 1; -} - - -uint32 -rpl_binlog_state::seq_no_from_state() -{ - ulong i, j; - uint64 seq_no= 0; - - for (i= 0; i < hash.records; ++i) - { - element *e= (element *)my_hash_element(&hash, i); - for (j= 0; j < e->hash.records; ++j) - { - const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&e->hash, j); - if (gtid->seq_no > seq_no) - seq_no= gtid->seq_no; - } - } - return seq_no; -} - - -/* - Write binlog state to text file, so we can read it in again without having - to scan last binlog file (normal shutdown/startup, not crash recovery). - - The most recent GTID within each domain_id is written after any other GTID - within this domain. -*/ -int -rpl_binlog_state::write_to_iocache(IO_CACHE *dest) -{ - ulong i, j; - char buf[21]; - - for (i= 0; i < hash.records; ++i) - { - size_t res; - element *e= (element *)my_hash_element(&hash, i); - for (j= 0; j <= e->hash.records; ++j) - { - const rpl_gtid *gtid; - if (j < e->hash.records) - { - gtid= (const rpl_gtid *)my_hash_element(&e->hash, j); - if (gtid == e->last_gtid) - continue; - } - else - gtid= e->last_gtid; - - longlong10_to_str(gtid->seq_no, buf, 10); - res= my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, buf); - if (res == (size_t) -1) - return 1; - } - } - - return 0; -} - - -int -rpl_binlog_state::read_from_iocache(IO_CACHE *src) -{ - /* 10-digit - 10-digit - 20-digit \n \0 */ - char buf[10+1+10+1+20+1+1]; - char *p, *end; - rpl_gtid gtid; - - reset(); - for (;;) - { - size_t res= my_b_gets(src, buf, sizeof(buf)); - if (!res) - break; - p= buf; - end= buf + res; - if (gtid_parser_helper(&p, end, >id)) - return 1; - if (update(>id)) - return 1; - } - return 0; -} - - -slave_connection_state::slave_connection_state() -{ - my_hash_init(&hash, &my_charset_bin, 32, - offsetof(rpl_gtid, domain_id), sizeof(uint32), NULL, my_free, - HASH_UNIQUE); -} - - -slave_connection_state::~slave_connection_state() -{ - my_hash_free(&hash); -} - - -/* - Create a hash from the slave GTID state that is sent to master when slave - connects to start replication. - - The state is sent as <GTID>,<GTID>,...,<GTID>, for example: - - 0-2-112,1-4-1022 - - The state gives for each domain_id the GTID to start replication from for - the corresponding replication stream. So domain_id must be unique. - - Returns 0 if ok, non-zero if error due to malformed input. - - Note that input string is built by slave server, so it will not be incorrect - unless bug/corruption/malicious server. So we just need basic sanity check, - not fancy user-friendly error message. -*/ - -int -slave_connection_state::load(char *slave_request, size_t len) -{ - char *p, *end; - uchar *rec; - rpl_gtid *gtid; - const rpl_gtid *gtid2; - - my_hash_reset(&hash); - p= slave_request; - end= slave_request + len; - if (p == end) - return 0; - for (;;) - { - if (!(rec= (uchar *)my_malloc(sizeof(*gtid), MYF(MY_WME)))) - { - my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*gtid)); - return 1; - } - gtid= (rpl_gtid *)rec; - if (gtid_parser_helper(&p, end, gtid)) - { - my_free(rec); - my_error(ER_INCORRECT_GTID_STATE, MYF(0)); - return 1; - } - if ((gtid2= (const rpl_gtid *) - my_hash_search(&hash, (const uchar *)(>id->domain_id), 0))) - { - my_error(ER_DUPLICATE_GTID_DOMAIN, MYF(0), gtid->domain_id, - gtid->server_id, (ulonglong)gtid->seq_no, gtid2->domain_id, - gtid2->server_id, (ulonglong)gtid2->seq_no, gtid->domain_id); - my_free(rec); - return 1; - } - if (my_hash_insert(&hash, rec)) - { - my_free(rec); - my_error(ER_OUT_OF_RESOURCES, MYF(0)); - return 1; - } - if (p == end) - break; /* Finished. */ - if (*p != ',') - { - my_error(ER_INCORRECT_GTID_STATE, MYF(0)); - return 1; - } - ++p; - } - - return 0; -} - - -int -slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count) -{ - uint32 i; - - my_hash_reset(&hash); - for (i= 0; i < count; ++i) - if (update(>id_list[i])) - return 1; - return 0; -} - - -rpl_gtid * -slave_connection_state::find(uint32 domain_id) -{ - return (rpl_gtid *) my_hash_search(&hash, (const uchar *)(&domain_id), 0); -} - - -int -slave_connection_state::update(const rpl_gtid *in_gtid) -{ - rpl_gtid *new_gtid; - uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0); - if (rec) - { - memcpy(rec, in_gtid, sizeof(*in_gtid)); - return 0; - } - - if (!(new_gtid= (rpl_gtid *)my_malloc(sizeof(*new_gtid), MYF(MY_WME)))) - return 1; - memcpy(new_gtid, in_gtid, sizeof(*new_gtid)); - if (my_hash_insert(&hash, (uchar *)new_gtid)) - { - my_free(new_gtid); - return 1; - } - - return 0; -} - - -void -slave_connection_state::remove(const rpl_gtid *in_gtid) -{ - bool err; - uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0); -#ifndef DBUG_OFF - rpl_gtid *slave_gtid= (rpl_gtid *)rec; - DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */); - DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id); - DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no); -#endif - - err= my_hash_delete(&hash, rec); - DBUG_ASSERT(!err); -} - - -int -slave_connection_state::to_string(String *out_str) -{ - uint32 i; - bool first; - - out_str->length(0); - first= true; - for (i= 0; i < hash.records; ++i) - { - const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i); - if (rpl_slave_state_tostring_helper(out_str, gtid, &first)) - return 1; - } - return 0; -} - - -#endif /* MYSQL_SERVER */ - - Gtid_log_event::Gtid_log_event(const char *buf, uint event_len, const Format_description_log_event *description_event) : Log_event(buf, description_event), seq_no(0) @@ -7284,78 +6345,6 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, } -uint32 -rpl_binlog_state::count() -{ - uint32 c= 0; - uint32 i; - - for (i= 0; i < hash.records; ++i) - c+= ((element *)my_hash_element(&hash, i))->hash.records; - - return c; -} - - -int -rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size) -{ - uint32 i, j, pos; - - pos= 0; - for (i= 0; i < hash.records; ++i) - { - element *e= (element *)my_hash_element(&hash, i); - for (j= 0; j <= e->hash.records; ++j) - { - const rpl_gtid *gtid; - if (j < e->hash.records) - { - gtid= (rpl_gtid *)my_hash_element(&e->hash, j); - if (gtid == e->last_gtid) - continue; - } - else - gtid= e->last_gtid; - - if (pos >= list_size) - return 1; - memcpy(>id_list[pos++], gtid, sizeof(*gtid)); - } - } - - return 0; -} - - -/* - Get a list of the most recently binlogged GTID, for each domain_id. - - This can be used when switching from being a master to being a slave, - to know where to start replicating from the new master. - - The returned list must be de-allocated with my_free(). - - Returns 0 for ok, non-zero for out-of-memory. -*/ -int -rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size) -{ - uint32 i; - - *size= hash.records; - if (!(*list= (rpl_gtid *)my_malloc(*size * sizeof(rpl_gtid), MYF(MY_WME)))) - return 1; - for (i= 0; i < *size; ++i) - { - element *e= (element *)my_hash_element(&hash, i); - memcpy(&((*list)[i]), e->last_gtid, sizeof(rpl_gtid)); - } - - return 0; -} - - #ifdef MYSQL_SERVER Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set) diff --git a/sql/log_event.h b/sql/log_event.h index 89f2d37e714..6b91756cc8a 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -49,6 +49,8 @@ #include "sql_class.h" /* THD */ #endif +#include "rpl_gtid.h" + /* Forward declarations */ class String; @@ -2947,149 +2949,6 @@ public: }; -struct rpl_gtid -{ - uint32 domain_id; - uint32 server_id; - uint64 seq_no; -}; - - -/* - Replication slave state. - - For every independent replication stream (identified by domain_id), this - remembers the last gtid applied on the slave within this domain. - - Since events are always committed in-order within a single domain, this is - sufficient to maintain the state of the replication slave. -*/ -struct rpl_slave_state -{ - /* Elements in the list of GTIDs kept for each domain_id. */ - struct list_element - { - struct list_element *next; - uint64 sub_id; - uint64 seq_no; - uint32 server_id; - }; - - /* Elements in the HASH that hold the state for one domain_id. */ - struct element - { - struct list_element *list; - uint64 last_sub_id; - uint32 domain_id; - - list_element *grab_list() { list_element *l= list; list= NULL; return l; } - void add(list_element *l) - { - l->next= list; - list= l; - if (last_sub_id < l->sub_id) - last_sub_id= l->sub_id; - } - }; - - /* Mapping from domain_id to its element. */ - HASH hash; - /* Mutex protecting access to the state. */ - mysql_mutex_t LOCK_slave_state; - - bool inited; - bool loaded; - - rpl_slave_state(); - ~rpl_slave_state(); - - void init(); - void deinit(); - void truncate_hash(); - ulong count() const { return hash.records; } - int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no); - int truncate_state_table(THD *thd); - int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, - bool in_transaction); - uint64 next_subid(uint32 domain_id); - int tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra); - int load(THD *thd, char *state_from_master, size_t len, bool reset); - bool is_empty(); - - void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); } - void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); } - - element *get_element(uint32 domain_id); - - void update_state_hash(uint64 sub_id, rpl_gtid *gtid); - int record_and_update_gtid(THD *thd, Relay_log_info *rli); -}; - - -/* - Binlog state. - This keeps the last GTID written to the binlog for every distinct - (domain_id, server_id) pair. - This will be logged at the start of the next binlog file as a - Gtid_list_log_event; this way, it is easy to find the binlog file - containing a gigen GTID, by simply scanning backwards from the newest - one until a lower seq_no is found in the Gtid_list_log_event at the - start of a binlog for the given domain_id and server_id. - - We also remember the last logged GTID for every domain_id. This is used - to know where to start when a master is changed to a slave. As a side - effect, it also allows to skip a hash lookup in the very common case of - logging a new GTID with same server id as last GTID. -*/ -struct rpl_binlog_state -{ - struct element { - uint32 domain_id; - HASH hash; /* Containing all server_id for one domain_id */ - /* The most recent entry in the hash. */ - rpl_gtid *last_gtid; - }; - /* Mapping from domain_id to collection of elements. */ - HASH hash; - /* Mutex protecting access to the state. */ - mysql_mutex_t LOCK_binlog_state; - - rpl_binlog_state(); - ~rpl_binlog_state(); - - void reset(); - int update(const struct rpl_gtid *gtid); - uint32 seq_no_from_state(); - int write_to_iocache(IO_CACHE *dest); - int read_from_iocache(IO_CACHE *src); - uint32 count(); - int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size); - int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size); -}; - - -/* - Represent the GTID state that a slave connection to a master requests - the master to start sending binlog events from. -*/ -struct slave_connection_state -{ - /* Mapping from domain_id to the GTID requested for that domain. */ - HASH hash; - - slave_connection_state(); - ~slave_connection_state(); - - int load(char *slave_request, size_t len); - int load(const rpl_gtid *gtid_list, uint32 count); - rpl_gtid *find(uint32 domain_id); - int update(const rpl_gtid *in_gtid); - void remove(const rpl_gtid *gtid); - ulong count() const { return hash.records; } - int to_string(String *out_str); -}; - - /** @class Gtid_log_event diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc new file mode 100644 index 00000000000..62c271758ac --- /dev/null +++ b/sql/rpl_gtid.cc @@ -0,0 +1,1035 @@ +/* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + + +/* Definitions for MariaDB global transaction ID (GTID). */ + + +#include "sql_priv.h" +#include "my_sys.h" +#include "unireg.h" +#include "my_global.h" +#include "sql_base.h" +#include "sql_parse.h" +#include "key.h" +#include "rpl_gtid.h" +#include "rpl_rli.h" + + +const LEX_STRING rpl_gtid_slave_state_table_name= + { STRING_WITH_LEN("rpl_slave_state") }; + + +void +rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid) +{ + int err; + /* + Add the gtid to the HASH in the replication slave state. + + We must do this only _after_ commit, so that for parallel replication, + there will not be an attempt to delete the corresponding table row before + it is even committed. + */ + lock(); + err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no); + unlock(); + if (err) + { + sql_print_warning("Slave: Out of memory during slave state maintenance. " + "Some no longer necessary rows in table " + "mysql.rpl_slave_state may be left undeleted."); + /* + Such failure is not fatal. We will fail to delete the row for this + GTID, but it will do no harm and will be removed automatically on next + server restart. + */ + } +} + + +int +rpl_slave_state::record_and_update_gtid(THD *thd, Relay_log_info *rli) +{ + uint64 sub_id; + + /* + Update the GTID position, if we have it and did not already update + it in a GTID transaction. + */ + if ((sub_id= rli->gtid_sub_id)) + { + rli->gtid_sub_id= 0; + if (record_gtid(thd, &rli->current_gtid, sub_id, false)) + return 1; + update_state_hash(sub_id, &rli->current_gtid); + } + return 0; +} + + +rpl_slave_state::rpl_slave_state() + : inited(false), loaded(false) +{ + my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id), + sizeof(uint32), NULL, my_free, HASH_UNIQUE); +} + + +rpl_slave_state::~rpl_slave_state() +{ +} + + +void +rpl_slave_state::init() +{ + DBUG_ASSERT(!inited); + mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state, MY_MUTEX_INIT_SLOW); + inited= true; +} + + +void +rpl_slave_state::truncate_hash() +{ + uint32 i; + + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + list_element *l= e->list; + list_element *next; + while (l) + { + next= l->next; + my_free(l); + l= next; + } + /* The element itself is freed by the hash element free function. */ + } + my_hash_reset(&hash); +} + +void +rpl_slave_state::deinit() +{ + if (!inited) + return; + truncate_hash(); + my_hash_free(&hash); + mysql_mutex_destroy(&LOCK_slave_state); +} + + +int +rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, + uint64 seq_no) +{ + element *elem= NULL; + list_element *list_elem= NULL; + + if (!(elem= get_element(domain_id))) + return 1; + + if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME)))) + return 1; + list_elem->server_id= server_id; + list_elem->sub_id= sub_id; + list_elem->seq_no= seq_no; + + elem->add(list_elem); + return 0; +} + + +struct rpl_slave_state::element * +rpl_slave_state::get_element(uint32 domain_id) +{ + struct element *elem; + + elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0); + if (elem) + return elem; + + if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME)))) + return NULL; + elem->list= NULL; + elem->last_sub_id= 0; + elem->domain_id= domain_id; + if (my_hash_insert(&hash, (uchar *)elem)) + { + my_free(elem); + return NULL; + } + return elem; +} + + +int +rpl_slave_state::truncate_state_table(THD *thd) +{ + TABLE_LIST tlist; + int err= 0; + TABLE *table; + + mysql_reset_thd_for_next_command(thd, 0); + + tlist.init_one_table(STRING_WITH_LEN("mysql"), + rpl_gtid_slave_state_table_name.str, + rpl_gtid_slave_state_table_name.length, + NULL, TL_WRITE); + if (!(err= open_and_lock_tables(thd, &tlist, FALSE, 0))) + { + table= tlist.table; + table->no_replicate= 1; + err= table->file->ha_truncate(); + + if (err) + { + ha_rollback_trans(thd, FALSE); + close_thread_tables(thd); + ha_rollback_trans(thd, TRUE); + } + else + { + ha_commit_trans(thd, FALSE); + close_thread_tables(thd); + ha_commit_trans(thd, TRUE); + } + } + + return err; +} + + +static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= { + { { C_STRING_WITH_LEN("domain_id") }, + { C_STRING_WITH_LEN("int(10) unsigned") }, + {NULL, 0} }, + { { C_STRING_WITH_LEN("sub_id") }, + { C_STRING_WITH_LEN("bigint(20) unsigned") }, + {NULL, 0} }, + { { C_STRING_WITH_LEN("server_id") }, + { C_STRING_WITH_LEN("int(10) unsigned") }, + {NULL, 0} }, + { { C_STRING_WITH_LEN("seq_no") }, + { C_STRING_WITH_LEN("bigint(20) unsigned") }, + {NULL, 0} }, +}; + +static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1}; + +static const TABLE_FIELD_DEF mysql_rpl_slave_state_tabledef= { + array_elements(mysql_rpl_slave_state_coltypes), + mysql_rpl_slave_state_coltypes, + array_elements(mysql_rpl_slave_state_pk_parts), + mysql_rpl_slave_state_pk_parts +}; + +class Gtid_db_intact : public Table_check_intact +{ +protected: + void report_error(uint, const char *fmt, ...) + { + va_list args; + va_start(args, fmt); + error_log_print(ERROR_LEVEL, fmt, args); + va_end(args); + } +}; + +static Gtid_db_intact gtid_table_intact; + +/* + Write a gtid to the replication slave state table. + + Do it as part of the transaction, to get slave crash safety, or as a separate + transaction if !in_transaction (eg. MyISAM or DDL). + + gtid The global transaction id for this event group. + sub_id Value allocated within the sub_id when the event group was + read (sub_id must be consistent with commit order in master binlog). + + Note that caller must later ensure that the new gtid and sub_id is inserted + into the appropriate HASH element with rpl_slave_state.add(), so that it can + be deleted later. But this must only be done after COMMIT if in transaction. +*/ +int +rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, + bool in_transaction) +{ + TABLE_LIST tlist; + int err= 0; + bool table_opened= false; + TABLE *table; + list_element *elist= 0, *next; + element *elem; + ulonglong thd_saved_option= thd->variables.option_bits; + + mysql_reset_thd_for_next_command(thd, 0); + + tlist.init_one_table(STRING_WITH_LEN("mysql"), + rpl_gtid_slave_state_table_name.str, + rpl_gtid_slave_state_table_name.length, + NULL, TL_WRITE); + if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) + goto end; + table_opened= true; + table= tlist.table; + + if ((err= gtid_table_intact.check(table, &mysql_rpl_slave_state_tabledef))) + { + my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql", + rpl_gtid_slave_state_table_name.str); + goto end; + } + + table->no_replicate= 1; + if (!in_transaction) + thd->variables.option_bits&= + ~(ulonglong)(OPTION_NOT_AUTOCOMMIT|OPTION_BEGIN); + + /* + ToDo: Check the table definition, error if not as expected. + We need the correct first 4 columns with correct type, and the primary key. + */ + bitmap_set_all(table->write_set); + + table->field[0]->store((ulonglong)gtid->domain_id, true); + table->field[1]->store(sub_id, true); + table->field[2]->store((ulonglong)gtid->server_id, true); + table->field[3]->store(gtid->seq_no, true); + if ((err= table->file->ha_write_row(table->record[0]))) + goto end; + + lock(); + if ((elem= get_element(gtid->domain_id)) == NULL) + { + unlock(); + err= 1; + goto end; + } + elist= elem->grab_list(); + unlock(); + + if (!elist) + goto end; + + /* Now delete any already committed rows. */ + bitmap_set_bit(table->read_set, table->field[0]->field_index); + bitmap_set_bit(table->read_set, table->field[1]->field_index); + + if ((err= table->file->ha_index_init(0, 0))) + goto end; + while (elist) + { + uchar key_buffer[4+8]; + + next= elist->next; + + table->field[1]->store(elist->sub_id, true); + /* domain_id is already set in table->record[0] from write_row() above. */ + key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false); + if ((err= table->file->ha_index_read_map(table->record[1], key_buffer, + HA_WHOLE_KEY, HA_READ_KEY_EXACT)) || + (err= table->file->ha_delete_row(table->record[1]))) + break; + my_free(elist); + elist= next; + } + table->file->ha_index_end(); + +end: + + if (table_opened) + { + if (err) + { + /* + ToDo: If error, we need to put any remaining elist back into the HASH so + we can do another delete attempt later. + */ + ha_rollback_trans(thd, FALSE); + close_thread_tables(thd); + } + else + { + ha_commit_trans(thd, FALSE); + close_thread_tables(thd); + } + } + thd->variables.option_bits= thd_saved_option; + return err; +} + + +uint64 +rpl_slave_state::next_subid(uint32 domain_id) +{ + uint32 sub_id= 0; + element *elem; + + lock(); + elem= get_element(domain_id); + if (elem) + sub_id= ++elem->last_sub_id; + unlock(); + + return sub_id; +} + + +bool +rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first) +{ + if (*first) + *first= false; + else + if (dest->append(",",1)) + return true; + return + dest->append_ulonglong(gtid->domain_id) || + dest->append("-",1) || + dest->append_ulonglong(gtid->server_id) || + dest->append("-",1) || + dest->append_ulonglong(gtid->seq_no); +} + + +/* + Prepare the current slave state as a string, suitable for sending to the + master to request to receive binlog events starting from that GTID state. + + The state consists of the most recently applied GTID for each domain_id, + ie. the one with the highest sub_id within each domain_id. +*/ + +int +rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra) +{ + bool first= true; + uint32 i; + HASH gtid_hash; + uchar *rec; + rpl_gtid *gtid; + int res= 1; + + my_hash_init(>id_hash, &my_charset_bin, 32, offsetof(rpl_gtid, domain_id), + sizeof(uint32), NULL, NULL, HASH_UNIQUE); + for (i= 0; i < num_extra; ++i) + if (my_hash_insert(>id_hash, (uchar *)(&extra_gtids[i]))) + goto err; + + lock(); + + for (i= 0; i < hash.records; ++i) + { + uint64 best_sub_id; + rpl_gtid best_gtid; + element *e= (element *)my_hash_element(&hash, i); + list_element *l= e->list; + + if (!l) + continue; /* Nothing here */ + + best_gtid.domain_id= e->domain_id; + best_gtid.server_id= l->server_id; + best_gtid.seq_no= l->seq_no; + best_sub_id= l->sub_id; + while ((l= l->next)) + { + if (l->sub_id > best_sub_id) + { + best_sub_id= l->sub_id; + best_gtid.server_id= l->server_id; + best_gtid.seq_no= l->seq_no; + } + } + + /* Check if we have something newer in the extra list. */ + rec= my_hash_search(>id_hash, (const uchar *)&best_gtid.domain_id, 0); + if (rec) + { + gtid= (rpl_gtid *)rec; + if (gtid->seq_no > best_gtid.seq_no) + memcpy(&best_gtid, gtid, sizeof(best_gtid)); + if (my_hash_delete(>id_hash, rec)) + { + unlock(); + goto err; + } + } + + if (rpl_slave_state_tostring_helper(dest, &best_gtid, &first)) + { + unlock(); + goto err; + } + } + + unlock(); + + /* Also add any remaining extra domain_ids. */ + for (i= 0; i < gtid_hash.records; ++i) + { + gtid= (rpl_gtid *)my_hash_element(>id_hash, i); + if (rpl_slave_state_tostring_helper(dest, gtid, &first)) + goto err; + } + + res= 0; + +err: + my_hash_free(>id_hash); + + return res; +} + + +/* + Parse a GTID at the start of a string, and update the pointer to point + at the first character after the parsed GTID. + + GTID can be in short form with domain_id=0 implied, SERVERID-SEQNO. + Or long form, DOMAINID-SERVERID-SEQNO. + + Returns 0 on ok, non-zero on parse error. +*/ +static int +gtid_parser_helper(char **ptr, char *end, rpl_gtid *out_gtid) +{ + char *q; + char *p= *ptr; + uint64 v1, v2, v3; + int err= 0; + + q= end; + v1= (uint64)my_strtoll10(p, &q, &err); + if (err != 0 || v1 > (uint32)0xffffffff || q == end || *q != '-') + return 1; + p= q+1; + q= end; + v2= (uint64)my_strtoll10(p, &q, &err); + if (err != 0 || v2 > (uint32)0xffffffff || q == end || *q != '-') + return 1; + p= q+1; + q= end; + v3= (uint64)my_strtoll10(p, &q, &err); + if (err != 0) + return 1; + + out_gtid->domain_id= v1; + out_gtid->server_id= v2; + out_gtid->seq_no= v3; + *ptr= q; + return 0; +} + + +/* + Update the slave replication state with the GTID position obtained from + master when connecting with old-style (filename,offset) position. + + If RESET is true then all existing entries are removed. Otherwise only + domain_ids mentioned in the STATE_FROM_MASTER are changed. + + Returns 0 if ok, non-zero if error. +*/ +int +rpl_slave_state::load(THD *thd, char *state_from_master, size_t len, + bool reset) +{ + char *end= state_from_master + len; + + if (reset) + { + if (truncate_state_table(thd)) + return 1; + truncate_hash(); + } + if (state_from_master == end) + return 0; + for (;;) + { + rpl_gtid gtid; + uint64 sub_id; + + if (gtid_parser_helper(&state_from_master, end, >id) || + !(sub_id= next_subid(gtid.domain_id)) || + record_gtid(thd, >id, sub_id, false) || + update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no)) + return 1; + if (state_from_master == end) + break; + if (*state_from_master != ',') + return 1; + ++state_from_master; + } + return 0; +} + + +bool +rpl_slave_state::is_empty() +{ + uint32 i; + bool result= true; + + lock(); + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + if (e->list) + { + result= false; + break; + } + } + unlock(); + + return result; +} + + +rpl_binlog_state::rpl_binlog_state() +{ + my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id), + sizeof(uint32), NULL, my_free, HASH_UNIQUE); + mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state, + MY_MUTEX_INIT_SLOW); +} + + +void +rpl_binlog_state::reset() +{ + uint32 i; + + for (i= 0; i < hash.records; ++i) + my_hash_free(&((element *)my_hash_element(&hash, i))->hash); + my_hash_reset(&hash); +} + +rpl_binlog_state::~rpl_binlog_state() +{ + reset(); + my_hash_free(&hash); + mysql_mutex_destroy(&LOCK_binlog_state); +} + + +/* + Update replication state with a new GTID. + + If the (domain_id, server_id) pair already exists, then the new GTID replaces + the old one for that domain id. Else a new entry is inserted. + + Returns 0 for ok, 1 for error. +*/ +int +rpl_binlog_state::update(const struct rpl_gtid *gtid) +{ + rpl_gtid *lookup_gtid; + element *elem; + + elem= (element *)my_hash_search(&hash, (const uchar *)(>id->domain_id), 0); + if (elem) + { + /* + By far the most common case is that successive events within same + replication domain have the same server id (it changes only when + switching to a new master). So save a hash lookup in this case. + */ + if (likely(elem->last_gtid->server_id == gtid->server_id)) + { + elem->last_gtid->seq_no= gtid->seq_no; + return 0; + } + + lookup_gtid= (rpl_gtid *) + my_hash_search(&elem->hash, (const uchar *)>id->server_id, 0); + if (lookup_gtid) + { + lookup_gtid->seq_no= gtid->seq_no; + elem->last_gtid= lookup_gtid; + return 0; + } + + /* Allocate a new GTID and insert it. */ + lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME)); + if (!lookup_gtid) + return 1; + memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); + if (my_hash_insert(&elem->hash, (const uchar *)lookup_gtid)) + { + my_free(lookup_gtid); + return 1; + } + elem->last_gtid= lookup_gtid; + return 0; + } + + /* First time we see this domain_id; allocate a new element. */ + elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME)); + lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME)); + if (elem && lookup_gtid) + { + elem->domain_id= gtid->domain_id; + my_hash_init(&elem->hash, &my_charset_bin, 32, + offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free, + HASH_UNIQUE); + elem->last_gtid= lookup_gtid; + memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); + if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid)) + { + lookup_gtid= NULL; /* Do not free. */ + if (0 == my_hash_insert(&hash, (const uchar *)elem)) + return 0; + } + my_hash_free(&elem->hash); + } + + /* An error. */ + if (elem) + my_free(elem); + if (lookup_gtid) + my_free(lookup_gtid); + return 1; +} + + +uint32 +rpl_binlog_state::seq_no_from_state() +{ + ulong i, j; + uint64 seq_no= 0; + + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + for (j= 0; j < e->hash.records; ++j) + { + const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&e->hash, j); + if (gtid->seq_no > seq_no) + seq_no= gtid->seq_no; + } + } + return seq_no; +} + + +/* + Write binlog state to text file, so we can read it in again without having + to scan last binlog file (normal shutdown/startup, not crash recovery). + + The most recent GTID within each domain_id is written after any other GTID + within this domain. +*/ +int +rpl_binlog_state::write_to_iocache(IO_CACHE *dest) +{ + ulong i, j; + char buf[21]; + + for (i= 0; i < hash.records; ++i) + { + size_t res; + element *e= (element *)my_hash_element(&hash, i); + for (j= 0; j <= e->hash.records; ++j) + { + const rpl_gtid *gtid; + if (j < e->hash.records) + { + gtid= (const rpl_gtid *)my_hash_element(&e->hash, j); + if (gtid == e->last_gtid) + continue; + } + else + gtid= e->last_gtid; + + longlong10_to_str(gtid->seq_no, buf, 10); + res= my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, buf); + if (res == (size_t) -1) + return 1; + } + } + + return 0; +} + + +int +rpl_binlog_state::read_from_iocache(IO_CACHE *src) +{ + /* 10-digit - 10-digit - 20-digit \n \0 */ + char buf[10+1+10+1+20+1+1]; + char *p, *end; + rpl_gtid gtid; + + reset(); + for (;;) + { + size_t res= my_b_gets(src, buf, sizeof(buf)); + if (!res) + break; + p= buf; + end= buf + res; + if (gtid_parser_helper(&p, end, >id)) + return 1; + if (update(>id)) + return 1; + } + return 0; +} + + +slave_connection_state::slave_connection_state() +{ + my_hash_init(&hash, &my_charset_bin, 32, + offsetof(rpl_gtid, domain_id), sizeof(uint32), NULL, my_free, + HASH_UNIQUE); +} + + +uint32 +rpl_binlog_state::count() +{ + uint32 c= 0; + uint32 i; + + for (i= 0; i < hash.records; ++i) + c+= ((element *)my_hash_element(&hash, i))->hash.records; + + return c; +} + + +int +rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size) +{ + uint32 i, j, pos; + + pos= 0; + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + for (j= 0; j <= e->hash.records; ++j) + { + const rpl_gtid *gtid; + if (j < e->hash.records) + { + gtid= (rpl_gtid *)my_hash_element(&e->hash, j); + if (gtid == e->last_gtid) + continue; + } + else + gtid= e->last_gtid; + + if (pos >= list_size) + return 1; + memcpy(>id_list[pos++], gtid, sizeof(*gtid)); + } + } + + return 0; +} + + +/* + Get a list of the most recently binlogged GTID, for each domain_id. + + This can be used when switching from being a master to being a slave, + to know where to start replicating from the new master. + + The returned list must be de-allocated with my_free(). + + Returns 0 for ok, non-zero for out-of-memory. +*/ +int +rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size) +{ + uint32 i; + + *size= hash.records; + if (!(*list= (rpl_gtid *)my_malloc(*size * sizeof(rpl_gtid), MYF(MY_WME)))) + return 1; + for (i= 0; i < *size; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + memcpy(&((*list)[i]), e->last_gtid, sizeof(rpl_gtid)); + } + + return 0; +} + + +slave_connection_state::~slave_connection_state() +{ + my_hash_free(&hash); +} + + +/* + Create a hash from the slave GTID state that is sent to master when slave + connects to start replication. + + The state is sent as <GTID>,<GTID>,...,<GTID>, for example: + + 0-2-112,1-4-1022 + + The state gives for each domain_id the GTID to start replication from for + the corresponding replication stream. So domain_id must be unique. + + Returns 0 if ok, non-zero if error due to malformed input. + + Note that input string is built by slave server, so it will not be incorrect + unless bug/corruption/malicious server. So we just need basic sanity check, + not fancy user-friendly error message. +*/ + +int +slave_connection_state::load(char *slave_request, size_t len) +{ + char *p, *end; + uchar *rec; + rpl_gtid *gtid; + const rpl_gtid *gtid2; + + my_hash_reset(&hash); + p= slave_request; + end= slave_request + len; + if (p == end) + return 0; + for (;;) + { + if (!(rec= (uchar *)my_malloc(sizeof(*gtid), MYF(MY_WME)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*gtid)); + return 1; + } + gtid= (rpl_gtid *)rec; + if (gtid_parser_helper(&p, end, gtid)) + { + my_free(rec); + my_error(ER_INCORRECT_GTID_STATE, MYF(0)); + return 1; + } + if ((gtid2= (const rpl_gtid *) + my_hash_search(&hash, (const uchar *)(>id->domain_id), 0))) + { + my_error(ER_DUPLICATE_GTID_DOMAIN, MYF(0), gtid->domain_id, + gtid->server_id, (ulonglong)gtid->seq_no, gtid2->domain_id, + gtid2->server_id, (ulonglong)gtid2->seq_no, gtid->domain_id); + my_free(rec); + return 1; + } + if (my_hash_insert(&hash, rec)) + { + my_free(rec); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; + } + if (p == end) + break; /* Finished. */ + if (*p != ',') + { + my_error(ER_INCORRECT_GTID_STATE, MYF(0)); + return 1; + } + ++p; + } + + return 0; +} + + +int +slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count) +{ + uint32 i; + + my_hash_reset(&hash); + for (i= 0; i < count; ++i) + if (update(>id_list[i])) + return 1; + return 0; +} + + +rpl_gtid * +slave_connection_state::find(uint32 domain_id) +{ + return (rpl_gtid *) my_hash_search(&hash, (const uchar *)(&domain_id), 0); +} + + +int +slave_connection_state::update(const rpl_gtid *in_gtid) +{ + rpl_gtid *new_gtid; + uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0); + if (rec) + { + memcpy(rec, in_gtid, sizeof(*in_gtid)); + return 0; + } + + if (!(new_gtid= (rpl_gtid *)my_malloc(sizeof(*new_gtid), MYF(MY_WME)))) + return 1; + memcpy(new_gtid, in_gtid, sizeof(*new_gtid)); + if (my_hash_insert(&hash, (uchar *)new_gtid)) + { + my_free(new_gtid); + return 1; + } + + return 0; +} + + +void +slave_connection_state::remove(const rpl_gtid *in_gtid) +{ + bool err; + uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0); +#ifndef DBUG_OFF + rpl_gtid *slave_gtid= (rpl_gtid *)rec; + DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */); + DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id); + DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no); +#endif + + err= my_hash_delete(&hash, rec); + DBUG_ASSERT(!err); +} + + +int +slave_connection_state::to_string(String *out_str) +{ + uint32 i; + bool first; + + out_str->length(0); + first= true; + for (i= 0; i < hash.records; ++i) + { + const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i); + if (rpl_slave_state_tostring_helper(out_str, gtid, &first)) + return 1; + } + return 0; +} diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h new file mode 100644 index 00000000000..8cd2c362d68 --- /dev/null +++ b/sql/rpl_gtid.h @@ -0,0 +1,170 @@ +/* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#ifndef RPL_GTID_H +#define RPL_GTID_H + +/* Definitions for MariaDB global transaction ID (GTID). */ + + +class String; + +struct rpl_gtid +{ + uint32 domain_id; + uint32 server_id; + uint64 seq_no; +}; + + +/* + Replication slave state. + + For every independent replication stream (identified by domain_id), this + remembers the last gtid applied on the slave within this domain. + + Since events are always committed in-order within a single domain, this is + sufficient to maintain the state of the replication slave. +*/ +struct rpl_slave_state +{ + /* Elements in the list of GTIDs kept for each domain_id. */ + struct list_element + { + struct list_element *next; + uint64 sub_id; + uint64 seq_no; + uint32 server_id; + }; + + /* Elements in the HASH that hold the state for one domain_id. */ + struct element + { + struct list_element *list; + uint64 last_sub_id; + uint32 domain_id; + + list_element *grab_list() { list_element *l= list; list= NULL; return l; } + void add(list_element *l) + { + l->next= list; + list= l; + if (last_sub_id < l->sub_id) + last_sub_id= l->sub_id; + } + }; + + /* Mapping from domain_id to its element. */ + HASH hash; + /* Mutex protecting access to the state. */ + mysql_mutex_t LOCK_slave_state; + + bool inited; + bool loaded; + + rpl_slave_state(); + ~rpl_slave_state(); + + void init(); + void deinit(); + void truncate_hash(); + ulong count() const { return hash.records; } + int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no); + int truncate_state_table(THD *thd); + int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, + bool in_transaction); + uint64 next_subid(uint32 domain_id); + int tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra); + int load(THD *thd, char *state_from_master, size_t len, bool reset); + bool is_empty(); + + void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); } + void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); } + + element *get_element(uint32 domain_id); + + void update_state_hash(uint64 sub_id, rpl_gtid *gtid); + int record_and_update_gtid(THD *thd, Relay_log_info *rli); +}; + + +/* + Binlog state. + This keeps the last GTID written to the binlog for every distinct + (domain_id, server_id) pair. + This will be logged at the start of the next binlog file as a + Gtid_list_log_event; this way, it is easy to find the binlog file + containing a gigen GTID, by simply scanning backwards from the newest + one until a lower seq_no is found in the Gtid_list_log_event at the + start of a binlog for the given domain_id and server_id. + + We also remember the last logged GTID for every domain_id. This is used + to know where to start when a master is changed to a slave. As a side + effect, it also allows to skip a hash lookup in the very common case of + logging a new GTID with same server id as last GTID. +*/ +struct rpl_binlog_state +{ + struct element { + uint32 domain_id; + HASH hash; /* Containing all server_id for one domain_id */ + /* The most recent entry in the hash. */ + rpl_gtid *last_gtid; + }; + /* Mapping from domain_id to collection of elements. */ + HASH hash; + /* Mutex protecting access to the state. */ + mysql_mutex_t LOCK_binlog_state; + + rpl_binlog_state(); + ~rpl_binlog_state(); + + void reset(); + int update(const struct rpl_gtid *gtid); + uint32 seq_no_from_state(); + int write_to_iocache(IO_CACHE *dest); + int read_from_iocache(IO_CACHE *src); + uint32 count(); + int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size); + int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size); +}; + + +/* + Represent the GTID state that a slave connection to a master requests + the master to start sending binlog events from. +*/ +struct slave_connection_state +{ + /* Mapping from domain_id to the GTID requested for that domain. */ + HASH hash; + + slave_connection_state(); + ~slave_connection_state(); + + int load(char *slave_request, size_t len); + int load(const rpl_gtid *gtid_list, uint32 count); + rpl_gtid *find(uint32 domain_id); + int update(const rpl_gtid *in_gtid); + void remove(const rpl_gtid *gtid); + ulong count() const { return hash.records; } + int to_string(String *out_str); +}; + +extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, + bool *first); +extern const LEX_STRING rpl_gtid_slave_state_table_name; + +#endif /* RPL_GTID_H */ diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index c266c7d0b78..18b47045671 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -37,9 +37,6 @@ static int count_relay_log_space(Relay_log_info* rli); */ rpl_slave_state rpl_global_gtid_slave_state; -const LEX_STRING rpl_gtid_slave_state_table_name= - { STRING_WITH_LEN("rpl_slave_state") }; - // Defined in slave.cc int init_intvar_from_file(int* var, IO_CACHE* f, int default_val); diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 414baf9b762..c5ab25bcd66 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -592,7 +592,6 @@ private: int init_relay_log_info(Relay_log_info* rli, const char* info_fname); -extern const LEX_STRING rpl_gtid_slave_state_table_name; extern struct rpl_slave_state rpl_global_gtid_slave_state; |