summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-03-11 16:02:40 +0100
committerunknown <knielsen@knielsen-hq.org>2013-03-11 16:02:40 +0100
commit5e414f6b06a88477a25407ddb19ec3c9082aa5d5 (patch)
treefb32b7443c1c215cbc51a5ebc6177640e5ba7be4
parent320863530f2038fc8783d30d43a02452c21a186d (diff)
downloadmariadb-git-5e414f6b06a88477a25407ddb19ec3c9082aa5d5.tar.gz
MDEV-26: Global Transaction ID.
Move a bunch of GTID specific code into new file rpl_gtid.cc. Make libmysqld build.
-rw-r--r--libmysqld/CMakeLists.txt1
-rw-r--r--sql/CMakeLists.txt1
-rw-r--r--sql/item_strfunc.cc6
-rw-r--r--sql/log_event.cc1011
-rw-r--r--sql/log_event.h145
-rw-r--r--sql/rpl_gtid.cc1035
-rw-r--r--sql/rpl_gtid.h170
-rw-r--r--sql/rpl_rli.cc3
-rw-r--r--sql/rpl_rli.h1
9 files changed, 1215 insertions, 1158 deletions
diff --git a/libmysqld/CMakeLists.txt b/libmysqld/CMakeLists.txt
index 0f08ecd76bb..07a0551b4ab 100644
--- a/libmysqld/CMakeLists.txt
+++ b/libmysqld/CMakeLists.txt
@@ -98,6 +98,7 @@ SET(SQL_EMBEDDED_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc
../sql/rpl_reporting.cc
../sql/sql_expression_cache.cc
../sql/my_apc.cc ../sql/my_apc.h
+ ../sql/rpl_gtid.cc
${GEN_SOURCES}
${MYSYS_LIBWRAP_SOURCE}
)
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index 4aa7b8f882e..c6489a45d77 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -88,6 +88,7 @@ SET (SQL_SOURCE
threadpool_common.cc
../sql-common/mysql_async.c
my_apc.cc my_apc.h
+ rpl_gtid.cc
${GEN_SOURCES}
${MYSYS_LIBWRAP_SOURCE}
)
diff --git a/sql/item_strfunc.cc b/sql/item_strfunc.cc
index 79ed795fe3e..3f187ec2af7 100644
--- a/sql/item_strfunc.cc
+++ b/sql/item_strfunc.cc
@@ -2737,6 +2737,11 @@ void Item_func_binlog_gtid_pos::fix_length_and_dec()
String *Item_func_binlog_gtid_pos::val_str(String *str)
{
DBUG_ASSERT(fixed == 1);
+#ifndef HAVE_REPLICATION
+ null_value= 0;
+ str->copy("", 0, system_charset_info);
+ return str;
+#else
String name_str, *name;
longlong pos;
@@ -2757,6 +2762,7 @@ String *Item_func_binlog_gtid_pos::val_str(String *str)
err:
null_value= 1;
return NULL;
+#endif /* !HAVE_REPLICATION */
}
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 4431afa11e1..1309907e91f 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -3726,54 +3726,6 @@ bool test_if_equal_repl_errors(int expected_error, int actual_error)
}
-void
-rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
-{
- int err;
- /*
- Add the gtid to the HASH in the replication slave state.
-
- We must do this only _after_ commit, so that for parallel replication,
- there will not be an attempt to delete the corresponding table row before
- it is even committed.
- */
- lock();
- err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no);
- unlock();
- if (err)
- {
- sql_print_warning("Slave: Out of memory during slave state maintenance. "
- "Some no longer necessary rows in table "
- "mysql.rpl_slave_state may be left undeleted.");
- /*
- Such failure is not fatal. We will fail to delete the row for this
- GTID, but it will do no harm and will be removed automatically on next
- server restart.
- */
- }
-}
-
-
-int
-rpl_slave_state::record_and_update_gtid(THD *thd, Relay_log_info *rli)
-{
- uint64 sub_id;
-
- /*
- Update the GTID position, if we have it and did not already update
- it in a GTID transaction.
- */
- if ((sub_id= rli->gtid_sub_id))
- {
- rli->gtid_sub_id= 0;
- if (record_gtid(thd, &rli->current_gtid, sub_id, false))
- return 1;
- update_state_hash(sub_id, &rli->current_gtid);
- }
- return 0;
-}
-
-
/**
@todo
Compare the values of "affected rows" around here. Something
@@ -6142,897 +6094,6 @@ bool Binlog_checkpoint_log_event::write(IO_CACHE *file)
Global transaction ID stuff
**************************************************************************/
-rpl_slave_state::rpl_slave_state()
- : inited(false), loaded(false)
-{
- my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
- sizeof(uint32), NULL, my_free, HASH_UNIQUE);
-}
-
-
-rpl_slave_state::~rpl_slave_state()
-{
-}
-
-#ifdef MYSQL_SERVER
-void
-rpl_slave_state::init()
-{
- DBUG_ASSERT(!inited);
- mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state, MY_MUTEX_INIT_SLOW);
- inited= true;
-}
-
-
-void
-rpl_slave_state::truncate_hash()
-{
- uint32 i;
-
- for (i= 0; i < hash.records; ++i)
- {
- element *e= (element *)my_hash_element(&hash, i);
- list_element *l= e->list;
- list_element *next;
- while (l)
- {
- next= l->next;
- my_free(l);
- l= next;
- }
- /* The element itself is freed by the hash element free function. */
- }
- my_hash_reset(&hash);
-}
-
-void
-rpl_slave_state::deinit()
-{
- if (!inited)
- return;
- truncate_hash();
- my_hash_free(&hash);
- mysql_mutex_destroy(&LOCK_slave_state);
-}
-#endif
-
-
-int
-rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
- uint64 seq_no)
-{
- element *elem= NULL;
- list_element *list_elem= NULL;
-
- if (!(elem= get_element(domain_id)))
- return 1;
-
- if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
- return 1;
- list_elem->server_id= server_id;
- list_elem->sub_id= sub_id;
- list_elem->seq_no= seq_no;
-
- elem->add(list_elem);
- return 0;
-}
-
-
-struct rpl_slave_state::element *
-rpl_slave_state::get_element(uint32 domain_id)
-{
- struct element *elem;
-
- elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
- if (elem)
- return elem;
-
- if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
- return NULL;
- elem->list= NULL;
- elem->last_sub_id= 0;
- elem->domain_id= domain_id;
- if (my_hash_insert(&hash, (uchar *)elem))
- {
- my_free(elem);
- return NULL;
- }
- return elem;
-}
-
-
-#ifdef MYSQL_SERVER
-#ifdef HAVE_REPLICATION
-int
-rpl_slave_state::truncate_state_table(THD *thd)
-{
- TABLE_LIST tlist;
- int err= 0;
- TABLE *table;
-
- mysql_reset_thd_for_next_command(thd, 0);
-
- tlist.init_one_table(STRING_WITH_LEN("mysql"),
- rpl_gtid_slave_state_table_name.str,
- rpl_gtid_slave_state_table_name.length,
- NULL, TL_WRITE);
- if (!(err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
- {
- table= tlist.table;
- table->no_replicate= 1;
- err= table->file->ha_truncate();
-
- if (err)
- {
- ha_rollback_trans(thd, FALSE);
- close_thread_tables(thd);
- ha_rollback_trans(thd, TRUE);
- }
- else
- {
- ha_commit_trans(thd, FALSE);
- close_thread_tables(thd);
- ha_commit_trans(thd, TRUE);
- }
- }
-
- return err;
-}
-
-
-static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= {
- { { C_STRING_WITH_LEN("domain_id") },
- { C_STRING_WITH_LEN("int(10) unsigned") },
- {NULL, 0} },
- { { C_STRING_WITH_LEN("sub_id") },
- { C_STRING_WITH_LEN("bigint(20) unsigned") },
- {NULL, 0} },
- { { C_STRING_WITH_LEN("server_id") },
- { C_STRING_WITH_LEN("int(10) unsigned") },
- {NULL, 0} },
- { { C_STRING_WITH_LEN("seq_no") },
- { C_STRING_WITH_LEN("bigint(20) unsigned") },
- {NULL, 0} },
-};
-
-static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1};
-
-static const TABLE_FIELD_DEF mysql_rpl_slave_state_tabledef= {
- array_elements(mysql_rpl_slave_state_coltypes),
- mysql_rpl_slave_state_coltypes,
- array_elements(mysql_rpl_slave_state_pk_parts),
- mysql_rpl_slave_state_pk_parts
-};
-
-class Gtid_db_intact : public Table_check_intact
-{
-protected:
- void report_error(uint, const char *fmt, ...)
- {
- va_list args;
- va_start(args, fmt);
- error_log_print(ERROR_LEVEL, fmt, args);
- va_end(args);
- }
-};
-
-static Gtid_db_intact gtid_table_intact;
-
-/*
- Write a gtid to the replication slave state table.
-
- Do it as part of the transaction, to get slave crash safety, or as a separate
- transaction if !in_transaction (eg. MyISAM or DDL).
-
- gtid The global transaction id for this event group.
- sub_id Value allocated within the sub_id when the event group was
- read (sub_id must be consistent with commit order in master binlog).
-
- Note that caller must later ensure that the new gtid and sub_id is inserted
- into the appropriate HASH element with rpl_slave_state.add(), so that it can
- be deleted later. But this must only be done after COMMIT if in transaction.
-*/
-int
-rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
- bool in_transaction)
-{
- TABLE_LIST tlist;
- int err= 0;
- bool table_opened= false;
- TABLE *table;
- list_element *elist= 0, *next;
- element *elem;
- ulonglong thd_saved_option= thd->variables.option_bits;
-
- mysql_reset_thd_for_next_command(thd, 0);
-
- tlist.init_one_table(STRING_WITH_LEN("mysql"),
- rpl_gtid_slave_state_table_name.str,
- rpl_gtid_slave_state_table_name.length,
- NULL, TL_WRITE);
- if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
- goto end;
- table_opened= true;
- table= tlist.table;
-
- if ((err= gtid_table_intact.check(table, &mysql_rpl_slave_state_tabledef)))
- {
- my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql",
- rpl_gtid_slave_state_table_name.str);
- goto end;
- }
-
- table->no_replicate= 1;
- if (!in_transaction)
- thd->variables.option_bits&=
- ~(ulonglong)(OPTION_NOT_AUTOCOMMIT|OPTION_BEGIN);
-
- /*
- ToDo: Check the table definition, error if not as expected.
- We need the correct first 4 columns with correct type, and the primary key.
- */
- bitmap_set_all(table->write_set);
-
- table->field[0]->store((ulonglong)gtid->domain_id, true);
- table->field[1]->store(sub_id, true);
- table->field[2]->store((ulonglong)gtid->server_id, true);
- table->field[3]->store(gtid->seq_no, true);
- if ((err= table->file->ha_write_row(table->record[0])))
- goto end;
-
- lock();
- if ((elem= get_element(gtid->domain_id)) == NULL)
- {
- unlock();
- err= 1;
- goto end;
- }
- elist= elem->grab_list();
- unlock();
-
- if (!elist)
- goto end;
-
- /* Now delete any already committed rows. */
- bitmap_set_bit(table->read_set, table->field[0]->field_index);
- bitmap_set_bit(table->read_set, table->field[1]->field_index);
-
- if ((err= table->file->ha_index_init(0, 0)))
- goto end;
- while (elist)
- {
- uchar key_buffer[4+8];
-
- next= elist->next;
-
- table->field[1]->store(elist->sub_id, true);
- /* domain_id is already set in table->record[0] from write_row() above. */
- key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
- if ((err= table->file->ha_index_read_map(table->record[1], key_buffer,
- HA_WHOLE_KEY, HA_READ_KEY_EXACT)) ||
- (err= table->file->ha_delete_row(table->record[1])))
- break;
- my_free(elist);
- elist= next;
- }
- table->file->ha_index_end();
-
-end:
-
- if (table_opened)
- {
- if (err)
- {
- /*
- ToDo: If error, we need to put any remaining elist back into the HASH so
- we can do another delete attempt later.
- */
- ha_rollback_trans(thd, FALSE);
- close_thread_tables(thd);
- }
- else
- {
- ha_commit_trans(thd, FALSE);
- close_thread_tables(thd);
- }
- }
- thd->variables.option_bits= thd_saved_option;
- return err;
-}
-
-
-uint64
-rpl_slave_state::next_subid(uint32 domain_id)
-{
- uint32 sub_id= 0;
- element *elem;
-
- lock();
- elem= get_element(domain_id);
- if (elem)
- sub_id= ++elem->last_sub_id;
- unlock();
-
- return sub_id;
-}
-#endif
-
-
-static
-bool
-rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first)
-{
- if (*first)
- *first= false;
- else
- if (dest->append(",",1))
- return true;
- return
- dest->append_ulonglong(gtid->domain_id) ||
- dest->append("-",1) ||
- dest->append_ulonglong(gtid->server_id) ||
- dest->append("-",1) ||
- dest->append_ulonglong(gtid->seq_no);
-}
-
-
-/*
- Prepare the current slave state as a string, suitable for sending to the
- master to request to receive binlog events starting from that GTID state.
-
- The state consists of the most recently applied GTID for each domain_id,
- ie. the one with the highest sub_id within each domain_id.
-*/
-
-int
-rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
-{
- bool first= true;
- uint32 i;
- HASH gtid_hash;
- uchar *rec;
- rpl_gtid *gtid;
- int res= 1;
-
- my_hash_init(&gtid_hash, &my_charset_bin, 32, offsetof(rpl_gtid, domain_id),
- sizeof(uint32), NULL, NULL, HASH_UNIQUE);
- for (i= 0; i < num_extra; ++i)
- if (my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
- goto err;
-
- lock();
-
- for (i= 0; i < hash.records; ++i)
- {
- uint64 best_sub_id;
- rpl_gtid best_gtid;
- element *e= (element *)my_hash_element(&hash, i);
- list_element *l= e->list;
-
- if (!l)
- continue; /* Nothing here */
-
- best_gtid.domain_id= e->domain_id;
- best_gtid.server_id= l->server_id;
- best_gtid.seq_no= l->seq_no;
- best_sub_id= l->sub_id;
- while ((l= l->next))
- {
- if (l->sub_id > best_sub_id)
- {
- best_sub_id= l->sub_id;
- best_gtid.server_id= l->server_id;
- best_gtid.seq_no= l->seq_no;
- }
- }
-
- /* Check if we have something newer in the extra list. */
- rec= my_hash_search(&gtid_hash, (const uchar *)&best_gtid.domain_id, 0);
- if (rec)
- {
- gtid= (rpl_gtid *)rec;
- if (gtid->seq_no > best_gtid.seq_no)
- memcpy(&best_gtid, gtid, sizeof(best_gtid));
- if (my_hash_delete(&gtid_hash, rec))
- {
- unlock();
- goto err;
- }
- }
-
- if (rpl_slave_state_tostring_helper(dest, &best_gtid, &first))
- {
- unlock();
- goto err;
- }
- }
-
- unlock();
-
- /* Also add any remaining extra domain_ids. */
- for (i= 0; i < gtid_hash.records; ++i)
- {
- gtid= (rpl_gtid *)my_hash_element(&gtid_hash, i);
- if (rpl_slave_state_tostring_helper(dest, gtid, &first))
- goto err;
- }
-
- res= 0;
-
-err:
- my_hash_free(&gtid_hash);
-
- return res;
-}
-
-
-/*
- Parse a GTID at the start of a string, and update the pointer to point
- at the first character after the parsed GTID.
-
- GTID can be in short form with domain_id=0 implied, SERVERID-SEQNO.
- Or long form, DOMAINID-SERVERID-SEQNO.
-
- Returns 0 on ok, non-zero on parse error.
-*/
-static int
-gtid_parser_helper(char **ptr, char *end, rpl_gtid *out_gtid)
-{
- char *q;
- char *p= *ptr;
- uint64 v1, v2, v3;
- int err= 0;
-
- q= end;
- v1= (uint64)my_strtoll10(p, &q, &err);
- if (err != 0 || v1 > (uint32)0xffffffff || q == end || *q != '-')
- return 1;
- p= q+1;
- q= end;
- v2= (uint64)my_strtoll10(p, &q, &err);
- if (err != 0 || v2 > (uint32)0xffffffff || q == end || *q != '-')
- return 1;
- p= q+1;
- q= end;
- v3= (uint64)my_strtoll10(p, &q, &err);
- if (err != 0)
- return 1;
-
- out_gtid->domain_id= v1;
- out_gtid->server_id= v2;
- out_gtid->seq_no= v3;
- *ptr= q;
- return 0;
-}
-
-
-/*
- Update the slave replication state with the GTID position obtained from
- master when connecting with old-style (filename,offset) position.
-
- If RESET is true then all existing entries are removed. Otherwise only
- domain_ids mentioned in the STATE_FROM_MASTER are changed.
-
- Returns 0 if ok, non-zero if error.
-*/
-int
-rpl_slave_state::load(THD *thd, char *state_from_master, size_t len,
- bool reset)
-{
- char *end= state_from_master + len;
-
- if (reset)
- {
- if (truncate_state_table(thd))
- return 1;
- truncate_hash();
- }
- if (state_from_master == end)
- return 0;
- for (;;)
- {
- rpl_gtid gtid;
- uint64 sub_id;
-
- if (gtid_parser_helper(&state_from_master, end, &gtid) ||
- !(sub_id= next_subid(gtid.domain_id)) ||
- record_gtid(thd, &gtid, sub_id, false) ||
- update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no))
- return 1;
- if (state_from_master == end)
- break;
- if (*state_from_master != ',')
- return 1;
- ++state_from_master;
- }
- return 0;
-}
-
-
-bool
-rpl_slave_state::is_empty()
-{
- uint32 i;
- bool result= true;
-
- lock();
- for (i= 0; i < hash.records; ++i)
- {
- element *e= (element *)my_hash_element(&hash, i);
- if (e->list)
- {
- result= false;
- break;
- }
- }
- unlock();
-
- return result;
-}
-
-
-rpl_binlog_state::rpl_binlog_state()
-{
- my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
- sizeof(uint32), NULL, my_free, HASH_UNIQUE);
- mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
- MY_MUTEX_INIT_SLOW);
-}
-
-
-void
-rpl_binlog_state::reset()
-{
- uint32 i;
-
- for (i= 0; i < hash.records; ++i)
- my_hash_free(&((element *)my_hash_element(&hash, i))->hash);
- my_hash_reset(&hash);
-}
-
-rpl_binlog_state::~rpl_binlog_state()
-{
- reset();
- my_hash_free(&hash);
- mysql_mutex_destroy(&LOCK_binlog_state);
-}
-
-
-/*
- Update replication state with a new GTID.
-
- If the (domain_id, server_id) pair already exists, then the new GTID replaces
- the old one for that domain id. Else a new entry is inserted.
-
- Returns 0 for ok, 1 for error.
-*/
-int
-rpl_binlog_state::update(const struct rpl_gtid *gtid)
-{
- rpl_gtid *lookup_gtid;
- element *elem;
-
- elem= (element *)my_hash_search(&hash, (const uchar *)(&gtid->domain_id), 0);
- if (elem)
- {
- /*
- By far the most common case is that successive events within same
- replication domain have the same server id (it changes only when
- switching to a new master). So save a hash lookup in this case.
- */
- if (likely(elem->last_gtid->server_id == gtid->server_id))
- {
- elem->last_gtid->seq_no= gtid->seq_no;
- return 0;
- }
-
- lookup_gtid= (rpl_gtid *)
- my_hash_search(&elem->hash, (const uchar *)&gtid->server_id, 0);
- if (lookup_gtid)
- {
- lookup_gtid->seq_no= gtid->seq_no;
- elem->last_gtid= lookup_gtid;
- return 0;
- }
-
- /* Allocate a new GTID and insert it. */
- lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
- if (!lookup_gtid)
- return 1;
- memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
- if (my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
- {
- my_free(lookup_gtid);
- return 1;
- }
- elem->last_gtid= lookup_gtid;
- return 0;
- }
-
- /* First time we see this domain_id; allocate a new element. */
- elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME));
- lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
- if (elem && lookup_gtid)
- {
- elem->domain_id= gtid->domain_id;
- my_hash_init(&elem->hash, &my_charset_bin, 32,
- offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
- HASH_UNIQUE);
- elem->last_gtid= lookup_gtid;
- memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
- if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
- {
- lookup_gtid= NULL; /* Do not free. */
- if (0 == my_hash_insert(&hash, (const uchar *)elem))
- return 0;
- }
- my_hash_free(&elem->hash);
- }
-
- /* An error. */
- if (elem)
- my_free(elem);
- if (lookup_gtid)
- my_free(lookup_gtid);
- return 1;
-}
-
-
-uint32
-rpl_binlog_state::seq_no_from_state()
-{
- ulong i, j;
- uint64 seq_no= 0;
-
- for (i= 0; i < hash.records; ++i)
- {
- element *e= (element *)my_hash_element(&hash, i);
- for (j= 0; j < e->hash.records; ++j)
- {
- const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
- if (gtid->seq_no > seq_no)
- seq_no= gtid->seq_no;
- }
- }
- return seq_no;
-}
-
-
-/*
- Write binlog state to text file, so we can read it in again without having
- to scan last binlog file (normal shutdown/startup, not crash recovery).
-
- The most recent GTID within each domain_id is written after any other GTID
- within this domain.
-*/
-int
-rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
-{
- ulong i, j;
- char buf[21];
-
- for (i= 0; i < hash.records; ++i)
- {
- size_t res;
- element *e= (element *)my_hash_element(&hash, i);
- for (j= 0; j <= e->hash.records; ++j)
- {
- const rpl_gtid *gtid;
- if (j < e->hash.records)
- {
- gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
- if (gtid == e->last_gtid)
- continue;
- }
- else
- gtid= e->last_gtid;
-
- longlong10_to_str(gtid->seq_no, buf, 10);
- res= my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, buf);
- if (res == (size_t) -1)
- return 1;
- }
- }
-
- return 0;
-}
-
-
-int
-rpl_binlog_state::read_from_iocache(IO_CACHE *src)
-{
- /* 10-digit - 10-digit - 20-digit \n \0 */
- char buf[10+1+10+1+20+1+1];
- char *p, *end;
- rpl_gtid gtid;
-
- reset();
- for (;;)
- {
- size_t res= my_b_gets(src, buf, sizeof(buf));
- if (!res)
- break;
- p= buf;
- end= buf + res;
- if (gtid_parser_helper(&p, end, &gtid))
- return 1;
- if (update(&gtid))
- return 1;
- }
- return 0;
-}
-
-
-slave_connection_state::slave_connection_state()
-{
- my_hash_init(&hash, &my_charset_bin, 32,
- offsetof(rpl_gtid, domain_id), sizeof(uint32), NULL, my_free,
- HASH_UNIQUE);
-}
-
-
-slave_connection_state::~slave_connection_state()
-{
- my_hash_free(&hash);
-}
-
-
-/*
- Create a hash from the slave GTID state that is sent to master when slave
- connects to start replication.
-
- The state is sent as <GTID>,<GTID>,...,<GTID>, for example:
-
- 0-2-112,1-4-1022
-
- The state gives for each domain_id the GTID to start replication from for
- the corresponding replication stream. So domain_id must be unique.
-
- Returns 0 if ok, non-zero if error due to malformed input.
-
- Note that input string is built by slave server, so it will not be incorrect
- unless bug/corruption/malicious server. So we just need basic sanity check,
- not fancy user-friendly error message.
-*/
-
-int
-slave_connection_state::load(char *slave_request, size_t len)
-{
- char *p, *end;
- uchar *rec;
- rpl_gtid *gtid;
- const rpl_gtid *gtid2;
-
- my_hash_reset(&hash);
- p= slave_request;
- end= slave_request + len;
- if (p == end)
- return 0;
- for (;;)
- {
- if (!(rec= (uchar *)my_malloc(sizeof(*gtid), MYF(MY_WME))))
- {
- my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*gtid));
- return 1;
- }
- gtid= (rpl_gtid *)rec;
- if (gtid_parser_helper(&p, end, gtid))
- {
- my_free(rec);
- my_error(ER_INCORRECT_GTID_STATE, MYF(0));
- return 1;
- }
- if ((gtid2= (const rpl_gtid *)
- my_hash_search(&hash, (const uchar *)(&gtid->domain_id), 0)))
- {
- my_error(ER_DUPLICATE_GTID_DOMAIN, MYF(0), gtid->domain_id,
- gtid->server_id, (ulonglong)gtid->seq_no, gtid2->domain_id,
- gtid2->server_id, (ulonglong)gtid2->seq_no, gtid->domain_id);
- my_free(rec);
- return 1;
- }
- if (my_hash_insert(&hash, rec))
- {
- my_free(rec);
- my_error(ER_OUT_OF_RESOURCES, MYF(0));
- return 1;
- }
- if (p == end)
- break; /* Finished. */
- if (*p != ',')
- {
- my_error(ER_INCORRECT_GTID_STATE, MYF(0));
- return 1;
- }
- ++p;
- }
-
- return 0;
-}
-
-
-int
-slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count)
-{
- uint32 i;
-
- my_hash_reset(&hash);
- for (i= 0; i < count; ++i)
- if (update(&gtid_list[i]))
- return 1;
- return 0;
-}
-
-
-rpl_gtid *
-slave_connection_state::find(uint32 domain_id)
-{
- return (rpl_gtid *) my_hash_search(&hash, (const uchar *)(&domain_id), 0);
-}
-
-
-int
-slave_connection_state::update(const rpl_gtid *in_gtid)
-{
- rpl_gtid *new_gtid;
- uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
- if (rec)
- {
- memcpy(rec, in_gtid, sizeof(*in_gtid));
- return 0;
- }
-
- if (!(new_gtid= (rpl_gtid *)my_malloc(sizeof(*new_gtid), MYF(MY_WME))))
- return 1;
- memcpy(new_gtid, in_gtid, sizeof(*new_gtid));
- if (my_hash_insert(&hash, (uchar *)new_gtid))
- {
- my_free(new_gtid);
- return 1;
- }
-
- return 0;
-}
-
-
-void
-slave_connection_state::remove(const rpl_gtid *in_gtid)
-{
- bool err;
- uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
-#ifndef DBUG_OFF
- rpl_gtid *slave_gtid= (rpl_gtid *)rec;
- DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */);
- DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id);
- DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no);
-#endif
-
- err= my_hash_delete(&hash, rec);
- DBUG_ASSERT(!err);
-}
-
-
-int
-slave_connection_state::to_string(String *out_str)
-{
- uint32 i;
- bool first;
-
- out_str->length(0);
- first= true;
- for (i= 0; i < hash.records; ++i)
- {
- const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i);
- if (rpl_slave_state_tostring_helper(out_str, gtid, &first))
- return 1;
- }
- return 0;
-}
-
-
-#endif /* MYSQL_SERVER */
-
-
Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
const Format_description_log_event *description_event)
: Log_event(buf, description_event), seq_no(0)
@@ -7284,78 +6345,6 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
}
-uint32
-rpl_binlog_state::count()
-{
- uint32 c= 0;
- uint32 i;
-
- for (i= 0; i < hash.records; ++i)
- c+= ((element *)my_hash_element(&hash, i))->hash.records;
-
- return c;
-}
-
-
-int
-rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
-{
- uint32 i, j, pos;
-
- pos= 0;
- for (i= 0; i < hash.records; ++i)
- {
- element *e= (element *)my_hash_element(&hash, i);
- for (j= 0; j <= e->hash.records; ++j)
- {
- const rpl_gtid *gtid;
- if (j < e->hash.records)
- {
- gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
- if (gtid == e->last_gtid)
- continue;
- }
- else
- gtid= e->last_gtid;
-
- if (pos >= list_size)
- return 1;
- memcpy(&gtid_list[pos++], gtid, sizeof(*gtid));
- }
- }
-
- return 0;
-}
-
-
-/*
- Get a list of the most recently binlogged GTID, for each domain_id.
-
- This can be used when switching from being a master to being a slave,
- to know where to start replicating from the new master.
-
- The returned list must be de-allocated with my_free().
-
- Returns 0 for ok, non-zero for out-of-memory.
-*/
-int
-rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
-{
- uint32 i;
-
- *size= hash.records;
- if (!(*list= (rpl_gtid *)my_malloc(*size * sizeof(rpl_gtid), MYF(MY_WME))))
- return 1;
- for (i= 0; i < *size; ++i)
- {
- element *e= (element *)my_hash_element(&hash, i);
- memcpy(&((*list)[i]), e->last_gtid, sizeof(rpl_gtid));
- }
-
- return 0;
-}
-
-
#ifdef MYSQL_SERVER
Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set)
diff --git a/sql/log_event.h b/sql/log_event.h
index 89f2d37e714..6b91756cc8a 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -49,6 +49,8 @@
#include "sql_class.h" /* THD */
#endif
+#include "rpl_gtid.h"
+
/* Forward declarations */
class String;
@@ -2947,149 +2949,6 @@ public:
};
-struct rpl_gtid
-{
- uint32 domain_id;
- uint32 server_id;
- uint64 seq_no;
-};
-
-
-/*
- Replication slave state.
-
- For every independent replication stream (identified by domain_id), this
- remembers the last gtid applied on the slave within this domain.
-
- Since events are always committed in-order within a single domain, this is
- sufficient to maintain the state of the replication slave.
-*/
-struct rpl_slave_state
-{
- /* Elements in the list of GTIDs kept for each domain_id. */
- struct list_element
- {
- struct list_element *next;
- uint64 sub_id;
- uint64 seq_no;
- uint32 server_id;
- };
-
- /* Elements in the HASH that hold the state for one domain_id. */
- struct element
- {
- struct list_element *list;
- uint64 last_sub_id;
- uint32 domain_id;
-
- list_element *grab_list() { list_element *l= list; list= NULL; return l; }
- void add(list_element *l)
- {
- l->next= list;
- list= l;
- if (last_sub_id < l->sub_id)
- last_sub_id= l->sub_id;
- }
- };
-
- /* Mapping from domain_id to its element. */
- HASH hash;
- /* Mutex protecting access to the state. */
- mysql_mutex_t LOCK_slave_state;
-
- bool inited;
- bool loaded;
-
- rpl_slave_state();
- ~rpl_slave_state();
-
- void init();
- void deinit();
- void truncate_hash();
- ulong count() const { return hash.records; }
- int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no);
- int truncate_state_table(THD *thd);
- int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
- bool in_transaction);
- uint64 next_subid(uint32 domain_id);
- int tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra);
- int load(THD *thd, char *state_from_master, size_t len, bool reset);
- bool is_empty();
-
- void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
- void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); }
-
- element *get_element(uint32 domain_id);
-
- void update_state_hash(uint64 sub_id, rpl_gtid *gtid);
- int record_and_update_gtid(THD *thd, Relay_log_info *rli);
-};
-
-
-/*
- Binlog state.
- This keeps the last GTID written to the binlog for every distinct
- (domain_id, server_id) pair.
- This will be logged at the start of the next binlog file as a
- Gtid_list_log_event; this way, it is easy to find the binlog file
- containing a gigen GTID, by simply scanning backwards from the newest
- one until a lower seq_no is found in the Gtid_list_log_event at the
- start of a binlog for the given domain_id and server_id.
-
- We also remember the last logged GTID for every domain_id. This is used
- to know where to start when a master is changed to a slave. As a side
- effect, it also allows to skip a hash lookup in the very common case of
- logging a new GTID with same server id as last GTID.
-*/
-struct rpl_binlog_state
-{
- struct element {
- uint32 domain_id;
- HASH hash; /* Containing all server_id for one domain_id */
- /* The most recent entry in the hash. */
- rpl_gtid *last_gtid;
- };
- /* Mapping from domain_id to collection of elements. */
- HASH hash;
- /* Mutex protecting access to the state. */
- mysql_mutex_t LOCK_binlog_state;
-
- rpl_binlog_state();
- ~rpl_binlog_state();
-
- void reset();
- int update(const struct rpl_gtid *gtid);
- uint32 seq_no_from_state();
- int write_to_iocache(IO_CACHE *dest);
- int read_from_iocache(IO_CACHE *src);
- uint32 count();
- int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
- int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
-};
-
-
-/*
- Represent the GTID state that a slave connection to a master requests
- the master to start sending binlog events from.
-*/
-struct slave_connection_state
-{
- /* Mapping from domain_id to the GTID requested for that domain. */
- HASH hash;
-
- slave_connection_state();
- ~slave_connection_state();
-
- int load(char *slave_request, size_t len);
- int load(const rpl_gtid *gtid_list, uint32 count);
- rpl_gtid *find(uint32 domain_id);
- int update(const rpl_gtid *in_gtid);
- void remove(const rpl_gtid *gtid);
- ulong count() const { return hash.records; }
- int to_string(String *out_str);
-};
-
-
/**
@class Gtid_log_event
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
new file mode 100644
index 00000000000..62c271758ac
--- /dev/null
+++ b/sql/rpl_gtid.cc
@@ -0,0 +1,1035 @@
+/* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+
+/* Definitions for MariaDB global transaction ID (GTID). */
+
+
+#include "sql_priv.h"
+#include "my_sys.h"
+#include "unireg.h"
+#include "my_global.h"
+#include "sql_base.h"
+#include "sql_parse.h"
+#include "key.h"
+#include "rpl_gtid.h"
+#include "rpl_rli.h"
+
+
+const LEX_STRING rpl_gtid_slave_state_table_name=
+ { STRING_WITH_LEN("rpl_slave_state") };
+
+
+void
+rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
+{
+ int err;
+ /*
+ Add the gtid to the HASH in the replication slave state.
+
+ We must do this only _after_ commit, so that for parallel replication,
+ there will not be an attempt to delete the corresponding table row before
+ it is even committed.
+ */
+ lock();
+ err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no);
+ unlock();
+ if (err)
+ {
+ sql_print_warning("Slave: Out of memory during slave state maintenance. "
+ "Some no longer necessary rows in table "
+ "mysql.rpl_slave_state may be left undeleted.");
+ /*
+ Such failure is not fatal. We will fail to delete the row for this
+ GTID, but it will do no harm and will be removed automatically on next
+ server restart.
+ */
+ }
+}
+
+
+int
+rpl_slave_state::record_and_update_gtid(THD *thd, Relay_log_info *rli)
+{
+ uint64 sub_id;
+
+ /*
+ Update the GTID position, if we have it and did not already update
+ it in a GTID transaction.
+ */
+ if ((sub_id= rli->gtid_sub_id))
+ {
+ rli->gtid_sub_id= 0;
+ if (record_gtid(thd, &rli->current_gtid, sub_id, false))
+ return 1;
+ update_state_hash(sub_id, &rli->current_gtid);
+ }
+ return 0;
+}
+
+
+rpl_slave_state::rpl_slave_state()
+ : inited(false), loaded(false)
+{
+ my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
+ sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+}
+
+
+rpl_slave_state::~rpl_slave_state()
+{
+}
+
+
+void
+rpl_slave_state::init()
+{
+ DBUG_ASSERT(!inited);
+ mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state, MY_MUTEX_INIT_SLOW);
+ inited= true;
+}
+
+
+void
+rpl_slave_state::truncate_hash()
+{
+ uint32 i;
+
+ for (i= 0; i < hash.records; ++i)
+ {
+ element *e= (element *)my_hash_element(&hash, i);
+ list_element *l= e->list;
+ list_element *next;
+ while (l)
+ {
+ next= l->next;
+ my_free(l);
+ l= next;
+ }
+ /* The element itself is freed by the hash element free function. */
+ }
+ my_hash_reset(&hash);
+}
+
+void
+rpl_slave_state::deinit()
+{
+ if (!inited)
+ return;
+ truncate_hash();
+ my_hash_free(&hash);
+ mysql_mutex_destroy(&LOCK_slave_state);
+}
+
+
+int
+rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
+ uint64 seq_no)
+{
+ element *elem= NULL;
+ list_element *list_elem= NULL;
+
+ if (!(elem= get_element(domain_id)))
+ return 1;
+
+ if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
+ return 1;
+ list_elem->server_id= server_id;
+ list_elem->sub_id= sub_id;
+ list_elem->seq_no= seq_no;
+
+ elem->add(list_elem);
+ return 0;
+}
+
+
+struct rpl_slave_state::element *
+rpl_slave_state::get_element(uint32 domain_id)
+{
+ struct element *elem;
+
+ elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
+ if (elem)
+ return elem;
+
+ if (!(elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME))))
+ return NULL;
+ elem->list= NULL;
+ elem->last_sub_id= 0;
+ elem->domain_id= domain_id;
+ if (my_hash_insert(&hash, (uchar *)elem))
+ {
+ my_free(elem);
+ return NULL;
+ }
+ return elem;
+}
+
+
+int
+rpl_slave_state::truncate_state_table(THD *thd)
+{
+ TABLE_LIST tlist;
+ int err= 0;
+ TABLE *table;
+
+ mysql_reset_thd_for_next_command(thd, 0);
+
+ tlist.init_one_table(STRING_WITH_LEN("mysql"),
+ rpl_gtid_slave_state_table_name.str,
+ rpl_gtid_slave_state_table_name.length,
+ NULL, TL_WRITE);
+ if (!(err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
+ {
+ table= tlist.table;
+ table->no_replicate= 1;
+ err= table->file->ha_truncate();
+
+ if (err)
+ {
+ ha_rollback_trans(thd, FALSE);
+ close_thread_tables(thd);
+ ha_rollback_trans(thd, TRUE);
+ }
+ else
+ {
+ ha_commit_trans(thd, FALSE);
+ close_thread_tables(thd);
+ ha_commit_trans(thd, TRUE);
+ }
+ }
+
+ return err;
+}
+
+
+static const TABLE_FIELD_TYPE mysql_rpl_slave_state_coltypes[4]= {
+ { { C_STRING_WITH_LEN("domain_id") },
+ { C_STRING_WITH_LEN("int(10) unsigned") },
+ {NULL, 0} },
+ { { C_STRING_WITH_LEN("sub_id") },
+ { C_STRING_WITH_LEN("bigint(20) unsigned") },
+ {NULL, 0} },
+ { { C_STRING_WITH_LEN("server_id") },
+ { C_STRING_WITH_LEN("int(10) unsigned") },
+ {NULL, 0} },
+ { { C_STRING_WITH_LEN("seq_no") },
+ { C_STRING_WITH_LEN("bigint(20) unsigned") },
+ {NULL, 0} },
+};
+
+static const uint mysql_rpl_slave_state_pk_parts[]= {0, 1};
+
+static const TABLE_FIELD_DEF mysql_rpl_slave_state_tabledef= {
+ array_elements(mysql_rpl_slave_state_coltypes),
+ mysql_rpl_slave_state_coltypes,
+ array_elements(mysql_rpl_slave_state_pk_parts),
+ mysql_rpl_slave_state_pk_parts
+};
+
+class Gtid_db_intact : public Table_check_intact
+{
+protected:
+ void report_error(uint, const char *fmt, ...)
+ {
+ va_list args;
+ va_start(args, fmt);
+ error_log_print(ERROR_LEVEL, fmt, args);
+ va_end(args);
+ }
+};
+
+static Gtid_db_intact gtid_table_intact;
+
+/*
+ Write a gtid to the replication slave state table.
+
+ Do it as part of the transaction, to get slave crash safety, or as a separate
+ transaction if !in_transaction (eg. MyISAM or DDL).
+
+ gtid The global transaction id for this event group.
+ sub_id Value allocated within the sub_id when the event group was
+ read (sub_id must be consistent with commit order in master binlog).
+
+ Note that caller must later ensure that the new gtid and sub_id is inserted
+ into the appropriate HASH element with rpl_slave_state.add(), so that it can
+ be deleted later. But this must only be done after COMMIT if in transaction.
+*/
+int
+rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
+ bool in_transaction)
+{
+ TABLE_LIST tlist;
+ int err= 0;
+ bool table_opened= false;
+ TABLE *table;
+ list_element *elist= 0, *next;
+ element *elem;
+ ulonglong thd_saved_option= thd->variables.option_bits;
+
+ mysql_reset_thd_for_next_command(thd, 0);
+
+ tlist.init_one_table(STRING_WITH_LEN("mysql"),
+ rpl_gtid_slave_state_table_name.str,
+ rpl_gtid_slave_state_table_name.length,
+ NULL, TL_WRITE);
+ if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
+ goto end;
+ table_opened= true;
+ table= tlist.table;
+
+ if ((err= gtid_table_intact.check(table, &mysql_rpl_slave_state_tabledef)))
+ {
+ my_error(ER_GTID_OPEN_TABLE_FAILED, MYF(0), "mysql",
+ rpl_gtid_slave_state_table_name.str);
+ goto end;
+ }
+
+ table->no_replicate= 1;
+ if (!in_transaction)
+ thd->variables.option_bits&=
+ ~(ulonglong)(OPTION_NOT_AUTOCOMMIT|OPTION_BEGIN);
+
+ /*
+ ToDo: Check the table definition, error if not as expected.
+ We need the correct first 4 columns with correct type, and the primary key.
+ */
+ bitmap_set_all(table->write_set);
+
+ table->field[0]->store((ulonglong)gtid->domain_id, true);
+ table->field[1]->store(sub_id, true);
+ table->field[2]->store((ulonglong)gtid->server_id, true);
+ table->field[3]->store(gtid->seq_no, true);
+ if ((err= table->file->ha_write_row(table->record[0])))
+ goto end;
+
+ lock();
+ if ((elem= get_element(gtid->domain_id)) == NULL)
+ {
+ unlock();
+ err= 1;
+ goto end;
+ }
+ elist= elem->grab_list();
+ unlock();
+
+ if (!elist)
+ goto end;
+
+ /* Now delete any already committed rows. */
+ bitmap_set_bit(table->read_set, table->field[0]->field_index);
+ bitmap_set_bit(table->read_set, table->field[1]->field_index);
+
+ if ((err= table->file->ha_index_init(0, 0)))
+ goto end;
+ while (elist)
+ {
+ uchar key_buffer[4+8];
+
+ next= elist->next;
+
+ table->field[1]->store(elist->sub_id, true);
+ /* domain_id is already set in table->record[0] from write_row() above. */
+ key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
+ if ((err= table->file->ha_index_read_map(table->record[1], key_buffer,
+ HA_WHOLE_KEY, HA_READ_KEY_EXACT)) ||
+ (err= table->file->ha_delete_row(table->record[1])))
+ break;
+ my_free(elist);
+ elist= next;
+ }
+ table->file->ha_index_end();
+
+end:
+
+ if (table_opened)
+ {
+ if (err)
+ {
+ /*
+ ToDo: If error, we need to put any remaining elist back into the HASH so
+ we can do another delete attempt later.
+ */
+ ha_rollback_trans(thd, FALSE);
+ close_thread_tables(thd);
+ }
+ else
+ {
+ ha_commit_trans(thd, FALSE);
+ close_thread_tables(thd);
+ }
+ }
+ thd->variables.option_bits= thd_saved_option;
+ return err;
+}
+
+
+uint64
+rpl_slave_state::next_subid(uint32 domain_id)
+{
+ uint32 sub_id= 0;
+ element *elem;
+
+ lock();
+ elem= get_element(domain_id);
+ if (elem)
+ sub_id= ++elem->last_sub_id;
+ unlock();
+
+ return sub_id;
+}
+
+
+bool
+rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first)
+{
+ if (*first)
+ *first= false;
+ else
+ if (dest->append(",",1))
+ return true;
+ return
+ dest->append_ulonglong(gtid->domain_id) ||
+ dest->append("-",1) ||
+ dest->append_ulonglong(gtid->server_id) ||
+ dest->append("-",1) ||
+ dest->append_ulonglong(gtid->seq_no);
+}
+
+
+/*
+ Prepare the current slave state as a string, suitable for sending to the
+ master to request to receive binlog events starting from that GTID state.
+
+ The state consists of the most recently applied GTID for each domain_id,
+ ie. the one with the highest sub_id within each domain_id.
+*/
+
+int
+rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
+{
+ bool first= true;
+ uint32 i;
+ HASH gtid_hash;
+ uchar *rec;
+ rpl_gtid *gtid;
+ int res= 1;
+
+ my_hash_init(&gtid_hash, &my_charset_bin, 32, offsetof(rpl_gtid, domain_id),
+ sizeof(uint32), NULL, NULL, HASH_UNIQUE);
+ for (i= 0; i < num_extra; ++i)
+ if (my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
+ goto err;
+
+ lock();
+
+ for (i= 0; i < hash.records; ++i)
+ {
+ uint64 best_sub_id;
+ rpl_gtid best_gtid;
+ element *e= (element *)my_hash_element(&hash, i);
+ list_element *l= e->list;
+
+ if (!l)
+ continue; /* Nothing here */
+
+ best_gtid.domain_id= e->domain_id;
+ best_gtid.server_id= l->server_id;
+ best_gtid.seq_no= l->seq_no;
+ best_sub_id= l->sub_id;
+ while ((l= l->next))
+ {
+ if (l->sub_id > best_sub_id)
+ {
+ best_sub_id= l->sub_id;
+ best_gtid.server_id= l->server_id;
+ best_gtid.seq_no= l->seq_no;
+ }
+ }
+
+ /* Check if we have something newer in the extra list. */
+ rec= my_hash_search(&gtid_hash, (const uchar *)&best_gtid.domain_id, 0);
+ if (rec)
+ {
+ gtid= (rpl_gtid *)rec;
+ if (gtid->seq_no > best_gtid.seq_no)
+ memcpy(&best_gtid, gtid, sizeof(best_gtid));
+ if (my_hash_delete(&gtid_hash, rec))
+ {
+ unlock();
+ goto err;
+ }
+ }
+
+ if (rpl_slave_state_tostring_helper(dest, &best_gtid, &first))
+ {
+ unlock();
+ goto err;
+ }
+ }
+
+ unlock();
+
+ /* Also add any remaining extra domain_ids. */
+ for (i= 0; i < gtid_hash.records; ++i)
+ {
+ gtid= (rpl_gtid *)my_hash_element(&gtid_hash, i);
+ if (rpl_slave_state_tostring_helper(dest, gtid, &first))
+ goto err;
+ }
+
+ res= 0;
+
+err:
+ my_hash_free(&gtid_hash);
+
+ return res;
+}
+
+
+/*
+ Parse a GTID at the start of a string, and update the pointer to point
+ at the first character after the parsed GTID.
+
+ GTID can be in short form with domain_id=0 implied, SERVERID-SEQNO.
+ Or long form, DOMAINID-SERVERID-SEQNO.
+
+ Returns 0 on ok, non-zero on parse error.
+*/
+static int
+gtid_parser_helper(char **ptr, char *end, rpl_gtid *out_gtid)
+{
+ char *q;
+ char *p= *ptr;
+ uint64 v1, v2, v3;
+ int err= 0;
+
+ q= end;
+ v1= (uint64)my_strtoll10(p, &q, &err);
+ if (err != 0 || v1 > (uint32)0xffffffff || q == end || *q != '-')
+ return 1;
+ p= q+1;
+ q= end;
+ v2= (uint64)my_strtoll10(p, &q, &err);
+ if (err != 0 || v2 > (uint32)0xffffffff || q == end || *q != '-')
+ return 1;
+ p= q+1;
+ q= end;
+ v3= (uint64)my_strtoll10(p, &q, &err);
+ if (err != 0)
+ return 1;
+
+ out_gtid->domain_id= v1;
+ out_gtid->server_id= v2;
+ out_gtid->seq_no= v3;
+ *ptr= q;
+ return 0;
+}
+
+
+/*
+ Update the slave replication state with the GTID position obtained from
+ master when connecting with old-style (filename,offset) position.
+
+ If RESET is true then all existing entries are removed. Otherwise only
+ domain_ids mentioned in the STATE_FROM_MASTER are changed.
+
+ Returns 0 if ok, non-zero if error.
+*/
+int
+rpl_slave_state::load(THD *thd, char *state_from_master, size_t len,
+ bool reset)
+{
+ char *end= state_from_master + len;
+
+ if (reset)
+ {
+ if (truncate_state_table(thd))
+ return 1;
+ truncate_hash();
+ }
+ if (state_from_master == end)
+ return 0;
+ for (;;)
+ {
+ rpl_gtid gtid;
+ uint64 sub_id;
+
+ if (gtid_parser_helper(&state_from_master, end, &gtid) ||
+ !(sub_id= next_subid(gtid.domain_id)) ||
+ record_gtid(thd, &gtid, sub_id, false) ||
+ update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no))
+ return 1;
+ if (state_from_master == end)
+ break;
+ if (*state_from_master != ',')
+ return 1;
+ ++state_from_master;
+ }
+ return 0;
+}
+
+
+bool
+rpl_slave_state::is_empty()
+{
+ uint32 i;
+ bool result= true;
+
+ lock();
+ for (i= 0; i < hash.records; ++i)
+ {
+ element *e= (element *)my_hash_element(&hash, i);
+ if (e->list)
+ {
+ result= false;
+ break;
+ }
+ }
+ unlock();
+
+ return result;
+}
+
+
+rpl_binlog_state::rpl_binlog_state()
+{
+ my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
+ sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+ mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state,
+ MY_MUTEX_INIT_SLOW);
+}
+
+
+void
+rpl_binlog_state::reset()
+{
+ uint32 i;
+
+ for (i= 0; i < hash.records; ++i)
+ my_hash_free(&((element *)my_hash_element(&hash, i))->hash);
+ my_hash_reset(&hash);
+}
+
+rpl_binlog_state::~rpl_binlog_state()
+{
+ reset();
+ my_hash_free(&hash);
+ mysql_mutex_destroy(&LOCK_binlog_state);
+}
+
+
+/*
+ Update replication state with a new GTID.
+
+ If the (domain_id, server_id) pair already exists, then the new GTID replaces
+ the old one for that domain id. Else a new entry is inserted.
+
+ Returns 0 for ok, 1 for error.
+*/
+int
+rpl_binlog_state::update(const struct rpl_gtid *gtid)
+{
+ rpl_gtid *lookup_gtid;
+ element *elem;
+
+ elem= (element *)my_hash_search(&hash, (const uchar *)(&gtid->domain_id), 0);
+ if (elem)
+ {
+ /*
+ By far the most common case is that successive events within same
+ replication domain have the same server id (it changes only when
+ switching to a new master). So save a hash lookup in this case.
+ */
+ if (likely(elem->last_gtid->server_id == gtid->server_id))
+ {
+ elem->last_gtid->seq_no= gtid->seq_no;
+ return 0;
+ }
+
+ lookup_gtid= (rpl_gtid *)
+ my_hash_search(&elem->hash, (const uchar *)&gtid->server_id, 0);
+ if (lookup_gtid)
+ {
+ lookup_gtid->seq_no= gtid->seq_no;
+ elem->last_gtid= lookup_gtid;
+ return 0;
+ }
+
+ /* Allocate a new GTID and insert it. */
+ lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
+ if (!lookup_gtid)
+ return 1;
+ memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
+ if (my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
+ {
+ my_free(lookup_gtid);
+ return 1;
+ }
+ elem->last_gtid= lookup_gtid;
+ return 0;
+ }
+
+ /* First time we see this domain_id; allocate a new element. */
+ elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME));
+ lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME));
+ if (elem && lookup_gtid)
+ {
+ elem->domain_id= gtid->domain_id;
+ my_hash_init(&elem->hash, &my_charset_bin, 32,
+ offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free,
+ HASH_UNIQUE);
+ elem->last_gtid= lookup_gtid;
+ memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid));
+ if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid))
+ {
+ lookup_gtid= NULL; /* Do not free. */
+ if (0 == my_hash_insert(&hash, (const uchar *)elem))
+ return 0;
+ }
+ my_hash_free(&elem->hash);
+ }
+
+ /* An error. */
+ if (elem)
+ my_free(elem);
+ if (lookup_gtid)
+ my_free(lookup_gtid);
+ return 1;
+}
+
+
+uint32
+rpl_binlog_state::seq_no_from_state()
+{
+ ulong i, j;
+ uint64 seq_no= 0;
+
+ for (i= 0; i < hash.records; ++i)
+ {
+ element *e= (element *)my_hash_element(&hash, i);
+ for (j= 0; j < e->hash.records; ++j)
+ {
+ const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
+ if (gtid->seq_no > seq_no)
+ seq_no= gtid->seq_no;
+ }
+ }
+ return seq_no;
+}
+
+
+/*
+ Write binlog state to text file, so we can read it in again without having
+ to scan last binlog file (normal shutdown/startup, not crash recovery).
+
+ The most recent GTID within each domain_id is written after any other GTID
+ within this domain.
+*/
+int
+rpl_binlog_state::write_to_iocache(IO_CACHE *dest)
+{
+ ulong i, j;
+ char buf[21];
+
+ for (i= 0; i < hash.records; ++i)
+ {
+ size_t res;
+ element *e= (element *)my_hash_element(&hash, i);
+ for (j= 0; j <= e->hash.records; ++j)
+ {
+ const rpl_gtid *gtid;
+ if (j < e->hash.records)
+ {
+ gtid= (const rpl_gtid *)my_hash_element(&e->hash, j);
+ if (gtid == e->last_gtid)
+ continue;
+ }
+ else
+ gtid= e->last_gtid;
+
+ longlong10_to_str(gtid->seq_no, buf, 10);
+ res= my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, buf);
+ if (res == (size_t) -1)
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+
+int
+rpl_binlog_state::read_from_iocache(IO_CACHE *src)
+{
+ /* 10-digit - 10-digit - 20-digit \n \0 */
+ char buf[10+1+10+1+20+1+1];
+ char *p, *end;
+ rpl_gtid gtid;
+
+ reset();
+ for (;;)
+ {
+ size_t res= my_b_gets(src, buf, sizeof(buf));
+ if (!res)
+ break;
+ p= buf;
+ end= buf + res;
+ if (gtid_parser_helper(&p, end, &gtid))
+ return 1;
+ if (update(&gtid))
+ return 1;
+ }
+ return 0;
+}
+
+
+slave_connection_state::slave_connection_state()
+{
+ my_hash_init(&hash, &my_charset_bin, 32,
+ offsetof(rpl_gtid, domain_id), sizeof(uint32), NULL, my_free,
+ HASH_UNIQUE);
+}
+
+
+uint32
+rpl_binlog_state::count()
+{
+ uint32 c= 0;
+ uint32 i;
+
+ for (i= 0; i < hash.records; ++i)
+ c+= ((element *)my_hash_element(&hash, i))->hash.records;
+
+ return c;
+}
+
+
+int
+rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
+{
+ uint32 i, j, pos;
+
+ pos= 0;
+ for (i= 0; i < hash.records; ++i)
+ {
+ element *e= (element *)my_hash_element(&hash, i);
+ for (j= 0; j <= e->hash.records; ++j)
+ {
+ const rpl_gtid *gtid;
+ if (j < e->hash.records)
+ {
+ gtid= (rpl_gtid *)my_hash_element(&e->hash, j);
+ if (gtid == e->last_gtid)
+ continue;
+ }
+ else
+ gtid= e->last_gtid;
+
+ if (pos >= list_size)
+ return 1;
+ memcpy(&gtid_list[pos++], gtid, sizeof(*gtid));
+ }
+ }
+
+ return 0;
+}
+
+
+/*
+ Get a list of the most recently binlogged GTID, for each domain_id.
+
+ This can be used when switching from being a master to being a slave,
+ to know where to start replicating from the new master.
+
+ The returned list must be de-allocated with my_free().
+
+ Returns 0 for ok, non-zero for out-of-memory.
+*/
+int
+rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
+{
+ uint32 i;
+
+ *size= hash.records;
+ if (!(*list= (rpl_gtid *)my_malloc(*size * sizeof(rpl_gtid), MYF(MY_WME))))
+ return 1;
+ for (i= 0; i < *size; ++i)
+ {
+ element *e= (element *)my_hash_element(&hash, i);
+ memcpy(&((*list)[i]), e->last_gtid, sizeof(rpl_gtid));
+ }
+
+ return 0;
+}
+
+
+slave_connection_state::~slave_connection_state()
+{
+ my_hash_free(&hash);
+}
+
+
+/*
+ Create a hash from the slave GTID state that is sent to master when slave
+ connects to start replication.
+
+ The state is sent as <GTID>,<GTID>,...,<GTID>, for example:
+
+ 0-2-112,1-4-1022
+
+ The state gives for each domain_id the GTID to start replication from for
+ the corresponding replication stream. So domain_id must be unique.
+
+ Returns 0 if ok, non-zero if error due to malformed input.
+
+ Note that input string is built by slave server, so it will not be incorrect
+ unless bug/corruption/malicious server. So we just need basic sanity check,
+ not fancy user-friendly error message.
+*/
+
+int
+slave_connection_state::load(char *slave_request, size_t len)
+{
+ char *p, *end;
+ uchar *rec;
+ rpl_gtid *gtid;
+ const rpl_gtid *gtid2;
+
+ my_hash_reset(&hash);
+ p= slave_request;
+ end= slave_request + len;
+ if (p == end)
+ return 0;
+ for (;;)
+ {
+ if (!(rec= (uchar *)my_malloc(sizeof(*gtid), MYF(MY_WME))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*gtid));
+ return 1;
+ }
+ gtid= (rpl_gtid *)rec;
+ if (gtid_parser_helper(&p, end, gtid))
+ {
+ my_free(rec);
+ my_error(ER_INCORRECT_GTID_STATE, MYF(0));
+ return 1;
+ }
+ if ((gtid2= (const rpl_gtid *)
+ my_hash_search(&hash, (const uchar *)(&gtid->domain_id), 0)))
+ {
+ my_error(ER_DUPLICATE_GTID_DOMAIN, MYF(0), gtid->domain_id,
+ gtid->server_id, (ulonglong)gtid->seq_no, gtid2->domain_id,
+ gtid2->server_id, (ulonglong)gtid2->seq_no, gtid->domain_id);
+ my_free(rec);
+ return 1;
+ }
+ if (my_hash_insert(&hash, rec))
+ {
+ my_free(rec);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+ }
+ if (p == end)
+ break; /* Finished. */
+ if (*p != ',')
+ {
+ my_error(ER_INCORRECT_GTID_STATE, MYF(0));
+ return 1;
+ }
+ ++p;
+ }
+
+ return 0;
+}
+
+
+int
+slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count)
+{
+ uint32 i;
+
+ my_hash_reset(&hash);
+ for (i= 0; i < count; ++i)
+ if (update(&gtid_list[i]))
+ return 1;
+ return 0;
+}
+
+
+rpl_gtid *
+slave_connection_state::find(uint32 domain_id)
+{
+ return (rpl_gtid *) my_hash_search(&hash, (const uchar *)(&domain_id), 0);
+}
+
+
+int
+slave_connection_state::update(const rpl_gtid *in_gtid)
+{
+ rpl_gtid *new_gtid;
+ uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
+ if (rec)
+ {
+ memcpy(rec, in_gtid, sizeof(*in_gtid));
+ return 0;
+ }
+
+ if (!(new_gtid= (rpl_gtid *)my_malloc(sizeof(*new_gtid), MYF(MY_WME))))
+ return 1;
+ memcpy(new_gtid, in_gtid, sizeof(*new_gtid));
+ if (my_hash_insert(&hash, (uchar *)new_gtid))
+ {
+ my_free(new_gtid);
+ return 1;
+ }
+
+ return 0;
+}
+
+
+void
+slave_connection_state::remove(const rpl_gtid *in_gtid)
+{
+ bool err;
+ uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
+#ifndef DBUG_OFF
+ rpl_gtid *slave_gtid= (rpl_gtid *)rec;
+ DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */);
+ DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id);
+ DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no);
+#endif
+
+ err= my_hash_delete(&hash, rec);
+ DBUG_ASSERT(!err);
+}
+
+
+int
+slave_connection_state::to_string(String *out_str)
+{
+ uint32 i;
+ bool first;
+
+ out_str->length(0);
+ first= true;
+ for (i= 0; i < hash.records; ++i)
+ {
+ const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i);
+ if (rpl_slave_state_tostring_helper(out_str, gtid, &first))
+ return 1;
+ }
+ return 0;
+}
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
new file mode 100644
index 00000000000..8cd2c362d68
--- /dev/null
+++ b/sql/rpl_gtid.h
@@ -0,0 +1,170 @@
+/* Copyright (c) 2013, Kristian Nielsen and MariaDB Services Ab.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+
+#ifndef RPL_GTID_H
+#define RPL_GTID_H
+
+/* Definitions for MariaDB global transaction ID (GTID). */
+
+
+class String;
+
+struct rpl_gtid
+{
+ uint32 domain_id;
+ uint32 server_id;
+ uint64 seq_no;
+};
+
+
+/*
+ Replication slave state.
+
+ For every independent replication stream (identified by domain_id), this
+ remembers the last gtid applied on the slave within this domain.
+
+ Since events are always committed in-order within a single domain, this is
+ sufficient to maintain the state of the replication slave.
+*/
+struct rpl_slave_state
+{
+ /* Elements in the list of GTIDs kept for each domain_id. */
+ struct list_element
+ {
+ struct list_element *next;
+ uint64 sub_id;
+ uint64 seq_no;
+ uint32 server_id;
+ };
+
+ /* Elements in the HASH that hold the state for one domain_id. */
+ struct element
+ {
+ struct list_element *list;
+ uint64 last_sub_id;
+ uint32 domain_id;
+
+ list_element *grab_list() { list_element *l= list; list= NULL; return l; }
+ void add(list_element *l)
+ {
+ l->next= list;
+ list= l;
+ if (last_sub_id < l->sub_id)
+ last_sub_id= l->sub_id;
+ }
+ };
+
+ /* Mapping from domain_id to its element. */
+ HASH hash;
+ /* Mutex protecting access to the state. */
+ mysql_mutex_t LOCK_slave_state;
+
+ bool inited;
+ bool loaded;
+
+ rpl_slave_state();
+ ~rpl_slave_state();
+
+ void init();
+ void deinit();
+ void truncate_hash();
+ ulong count() const { return hash.records; }
+ int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no);
+ int truncate_state_table(THD *thd);
+ int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
+ bool in_transaction);
+ uint64 next_subid(uint32 domain_id);
+ int tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra);
+ int load(THD *thd, char *state_from_master, size_t len, bool reset);
+ bool is_empty();
+
+ void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
+ void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); }
+
+ element *get_element(uint32 domain_id);
+
+ void update_state_hash(uint64 sub_id, rpl_gtid *gtid);
+ int record_and_update_gtid(THD *thd, Relay_log_info *rli);
+};
+
+
+/*
+ Binlog state.
+ This keeps the last GTID written to the binlog for every distinct
+ (domain_id, server_id) pair.
+ This will be logged at the start of the next binlog file as a
+ Gtid_list_log_event; this way, it is easy to find the binlog file
+ containing a gigen GTID, by simply scanning backwards from the newest
+ one until a lower seq_no is found in the Gtid_list_log_event at the
+ start of a binlog for the given domain_id and server_id.
+
+ We also remember the last logged GTID for every domain_id. This is used
+ to know where to start when a master is changed to a slave. As a side
+ effect, it also allows to skip a hash lookup in the very common case of
+ logging a new GTID with same server id as last GTID.
+*/
+struct rpl_binlog_state
+{
+ struct element {
+ uint32 domain_id;
+ HASH hash; /* Containing all server_id for one domain_id */
+ /* The most recent entry in the hash. */
+ rpl_gtid *last_gtid;
+ };
+ /* Mapping from domain_id to collection of elements. */
+ HASH hash;
+ /* Mutex protecting access to the state. */
+ mysql_mutex_t LOCK_binlog_state;
+
+ rpl_binlog_state();
+ ~rpl_binlog_state();
+
+ void reset();
+ int update(const struct rpl_gtid *gtid);
+ uint32 seq_no_from_state();
+ int write_to_iocache(IO_CACHE *dest);
+ int read_from_iocache(IO_CACHE *src);
+ uint32 count();
+ int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
+ int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
+};
+
+
+/*
+ Represent the GTID state that a slave connection to a master requests
+ the master to start sending binlog events from.
+*/
+struct slave_connection_state
+{
+ /* Mapping from domain_id to the GTID requested for that domain. */
+ HASH hash;
+
+ slave_connection_state();
+ ~slave_connection_state();
+
+ int load(char *slave_request, size_t len);
+ int load(const rpl_gtid *gtid_list, uint32 count);
+ rpl_gtid *find(uint32 domain_id);
+ int update(const rpl_gtid *in_gtid);
+ void remove(const rpl_gtid *gtid);
+ ulong count() const { return hash.records; }
+ int to_string(String *out_str);
+};
+
+extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
+ bool *first);
+extern const LEX_STRING rpl_gtid_slave_state_table_name;
+
+#endif /* RPL_GTID_H */
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index c266c7d0b78..18b47045671 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -37,9 +37,6 @@ static int count_relay_log_space(Relay_log_info* rli);
*/
rpl_slave_state rpl_global_gtid_slave_state;
-const LEX_STRING rpl_gtid_slave_state_table_name=
- { STRING_WITH_LEN("rpl_slave_state") };
-
// Defined in slave.cc
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 414baf9b762..c5ab25bcd66 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -592,7 +592,6 @@ private:
int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
-extern const LEX_STRING rpl_gtid_slave_state_table_name;
extern struct rpl_slave_state rpl_global_gtid_slave_state;