summaryrefslogtreecommitdiff
path: root/sql/rpl_gtid.cc
diff options
context:
space:
mode:
authorKristian Nielsen <knielsen@knielsen-hq.org>2017-07-03 09:33:41 +0200
committerKristian Nielsen <knielsen@knielsen-hq.org>2017-07-03 09:33:41 +0200
commit1d91910b944a801a2bbe138d4258c53eaeb0c473 (patch)
tree76f2ed8b5bb2c9e44eea7cc0366d8d1d7a070966 /sql/rpl_gtid.cc
parent176000a54ceb8dabe8f8b985aff565dfae6fb0df (diff)
parent95e09f0766f037530d2dcfbb6c530137a4ee0db4 (diff)
downloadmariadb-git-1d91910b944a801a2bbe138d4258c53eaeb0c473.tar.gz
MDEV-12179: Per-engine mysql.gtid_slave_pos table
Merge into MariaDB 10.3.
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r--sql/rpl_gtid.cc274
1 files changed, 235 insertions, 39 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index c385434e41e..fb57babd50f 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -26,6 +26,7 @@
#include "key.h"
#include "rpl_gtid.h"
#include "rpl_rli.h"
+#include "slave.h"
const LEX_STRING rpl_gtid_slave_state_table_name=
@@ -33,7 +34,7 @@ const LEX_STRING rpl_gtid_slave_state_table_name=
void
-rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
+rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton,
rpl_group_info *rgi)
{
int err;
@@ -45,7 +46,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
it is even committed.
*/
mysql_mutex_lock(&LOCK_slave_state);
- err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rgi);
+ err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, hton, rgi);
mysql_mutex_unlock(&LOCK_slave_state);
if (err)
{
@@ -74,12 +75,14 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
if (rgi->gtid_pending)
{
uint64 sub_id= rgi->gtid_sub_id;
+ void *hton= NULL;
+
rgi->gtid_pending= false;
if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
{
- if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
+ if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false, &hton))
DBUG_RETURN(1);
- update_state_hash(sub_id, &rgi->current_gtid, rgi);
+ update_state_hash(sub_id, &rgi->current_gtid, hton, rgi);
}
rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
}
@@ -243,7 +246,7 @@ rpl_slave_state_free_element(void *arg)
rpl_slave_state::rpl_slave_state()
- : last_sub_id(0), loaded(false)
+ : last_sub_id(0), gtid_pos_tables(0), loaded(false)
{
mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state,
MY_MUTEX_INIT_SLOW);
@@ -255,6 +258,7 @@ rpl_slave_state::rpl_slave_state()
rpl_slave_state::~rpl_slave_state()
{
+ free_gtid_pos_tables((struct gtid_pos_table *)gtid_pos_tables);
truncate_hash();
my_hash_free(&hash);
delete_dynamic(&gtid_sort_array);
@@ -286,11 +290,12 @@ rpl_slave_state::truncate_hash()
int
rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
- uint64 seq_no, rpl_group_info *rgi)
+ uint64 seq_no, void *hton, rpl_group_info *rgi)
{
element *elem= NULL;
list_element *list_elem= NULL;
+ DBUG_ASSERT(hton || !loaded);
if (!(elem= get_element(domain_id)))
return 1;
@@ -335,6 +340,7 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
list_elem->server_id= server_id;
list_elem->sub_id= sub_id;
list_elem->seq_no= seq_no;
+ list_elem->hton= hton;
elem->add(list_elem);
if (last_sub_id < sub_id)
@@ -466,6 +472,94 @@ gtid_check_rpl_slave_state_table(TABLE *table)
/*
+ Attempt to find a mysql.gtid_slave_posXXX table that has a storage engine
+ that is already in use by the current transaction, if any.
+*/
+void
+rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_STRING *out_tablename)
+{
+ struct gtid_pos_table *list, *table_entry, *default_entry;
+
+ /*
+ See comments on rpl_slave_state::gtid_pos_tables for rules around proper
+ access to the list.
+ */
+ list= (struct gtid_pos_table *)
+ my_atomic_loadptr_explicit(&gtid_pos_tables, MY_MEMORY_ORDER_ACQUIRE);
+
+ Ha_trx_info *ha_info;
+ uint count = 0;
+ for (ha_info= thd->transaction.all.ha_list; ha_info; ha_info= ha_info->next())
+ {
+ void *trx_hton= ha_info->ht();
+ table_entry= list;
+
+ if (!ha_info->is_trx_read_write() || trx_hton == binlog_hton)
+ continue;
+ while (table_entry)
+ {
+ if (table_entry->table_hton == trx_hton)
+ {
+ if (likely(table_entry->state == GTID_POS_AVAILABLE))
+ {
+ *out_tablename= table_entry->table_name;
+ /*
+ Check if this is a cross-engine transaction, so we can correctly
+ maintain the rpl_transactions_multi_engine status variable.
+ */
+ if (count >= 1)
+ statistic_increment(rpl_transactions_multi_engine, LOCK_status);
+ else
+ {
+ for (;;)
+ {
+ ha_info= ha_info->next();
+ if (!ha_info)
+ break;
+ if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton)
+ {
+ statistic_increment(rpl_transactions_multi_engine, LOCK_status);
+ break;
+ }
+ }
+ }
+ return;
+ }
+ /*
+ This engine is marked to automatically create the table.
+ We cannot easily do this here (possibly in the middle of a
+ transaction). But we can request the slave background thread
+ to create it, and in a short while it should become available
+ for following transactions.
+ */
+#ifdef HAVE_REPLICATION
+ slave_background_gtid_pos_create_request(table_entry);
+#endif
+ break;
+ }
+ table_entry= table_entry->next;
+ }
+ ++count;
+ }
+ /*
+ If we cannot find any table whose engine matches an engine that is
+ already active in the transaction, or if there is no current transaction
+ engines available, we return the default gtid_slave_pos table.
+ */
+ default_entry= (struct gtid_pos_table *)
+ my_atomic_loadptr_explicit(&default_gtid_pos_table, MY_MEMORY_ORDER_ACQUIRE);
+ *out_tablename= default_entry->table_name;
+ /* Record in status that we failed to find a suitable gtid_pos table. */
+ if (count > 0)
+ {
+ statistic_increment(transactions_gtid_foreign_engine, LOCK_status);
+ if (count > 1)
+ statistic_increment(rpl_transactions_multi_engine, LOCK_status);
+ }
+}
+
+
+/*
Write a gtid to the replication slave state table.
Do it as part of the transaction, to get slave crash safety, or as a separate
@@ -481,19 +575,24 @@ gtid_check_rpl_slave_state_table(TABLE *table)
*/
int
rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
- bool in_transaction, bool in_statement)
+ bool in_transaction, bool in_statement,
+ void **out_hton)
{
TABLE_LIST tlist;
int err= 0;
bool table_opened= false;
TABLE *table;
- list_element *elist= 0, *next;
+ list_element *delete_list= 0, *next, *cur, **next_ptr_ptr, **best_ptr_ptr;
+ uint64_t best_sub_id;
element *elem;
ulonglong thd_saved_option= thd->variables.option_bits;
Query_tables_list lex_backup;
wait_for_commit* suspended_wfc;
+ void *hton= NULL;
+ LEX_STRING gtid_pos_table_name;
DBUG_ENTER("record_gtid");
+ *out_hton= NULL;
if (unlikely(!loaded))
{
/*
@@ -508,6 +607,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
if (!in_statement)
thd->reset_for_next_command();
+ select_gtid_pos_table(thd, &gtid_pos_table_name);
DBUG_EXECUTE_IF("gtid_inject_record_gtid",
{
@@ -538,14 +638,13 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
*/
suspended_wfc= thd->suspend_subsequent_commits();
thd->lex->reset_n_backup_query_tables_list(&lex_backup);
- tlist.init_one_table(STRING_WITH_LEN("mysql"),
- rpl_gtid_slave_state_table_name.str,
- rpl_gtid_slave_state_table_name.length,
- NULL, TL_WRITE);
+ tlist.init_one_table(STRING_WITH_LEN("mysql"), gtid_pos_table_name.str,
+ gtid_pos_table_name.length, NULL, TL_WRITE);
if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
goto end;
table_opened= true;
table= tlist.table;
+ hton= table->s->db_type();
if ((err= gtid_check_rpl_slave_state_table(table)))
goto end;
@@ -581,6 +680,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
table->file->print_error(err, MYF(0));
goto end;
}
+ *out_hton= hton;
if(opt_bin_log &&
(err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id,
@@ -598,36 +698,62 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
err= 1;
goto end;
}
- if ((elist= elem->grab_list()) != NULL)
+
+ /* Now pull out all GTIDs that were recorded in this engine. */
+ delete_list = NULL;
+ next_ptr_ptr= &elem->list;
+ cur= elem->list;
+ best_sub_id= 0;
+ best_ptr_ptr= NULL;
+ while (cur)
{
- /* Delete any old stuff, but keep around the most recent one. */
- list_element *cur= elist;
- uint64 best_sub_id= cur->sub_id;
- list_element **best_ptr_ptr= &elist;
- while ((next= cur->next))
+ list_element *next= cur->next;
+ if (cur->hton == hton)
{
- if (next->sub_id > best_sub_id)
+ /* Belongs to same engine, so move it to the delete list. */
+ cur->next= delete_list;
+ delete_list= cur;
+ if (cur->sub_id > best_sub_id)
{
- best_sub_id= next->sub_id;
+ best_sub_id= cur->sub_id;
+ best_ptr_ptr= &delete_list;
+ }
+ else if (best_ptr_ptr == &delete_list)
best_ptr_ptr= &cur->next;
+ }
+ else
+ {
+ /* Another engine, leave it in the list. */
+ if (cur->sub_id > best_sub_id)
+ {
+ best_sub_id= cur->sub_id;
+ /* Current best is not on the delete list. */
+ best_ptr_ptr= NULL;
}
- cur= next;
+ *next_ptr_ptr= cur;
+ next_ptr_ptr= &cur->next;
}
- /*
- Delete the highest sub_id element from the old list, and put it back as
- the single-element new list.
- */
+ cur= next;
+ }
+ *next_ptr_ptr= NULL;
+ /*
+ If the highest sub_id element is on the delete list, put it back on the
+ original list, to preserve the highest sub_id element in the table for
+ GTID position recovery.
+ */
+ if (best_ptr_ptr)
+ {
cur= *best_ptr_ptr;
*best_ptr_ptr= cur->next;
- cur->next= NULL;
+ cur->next= elem->list;
elem->list= cur;
}
mysql_mutex_unlock(&LOCK_slave_state);
- if (!elist)
+ if (!delete_list)
goto end;
- /* Now delete any already committed rows. */
+ /* Now delete any already committed GTIDs. */
bitmap_set_bit(table->read_set, table->field[0]->field_index);
bitmap_set_bit(table->read_set, table->field[1]->field_index);
@@ -636,7 +762,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
table->file->print_error(err, MYF(0));
goto end;
}
- while (elist)
+ while (delete_list)
{
uchar key_buffer[4+8];
@@ -646,9 +772,9 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
/* `break' does not work inside DBUG_EXECUTE_IF */
goto dbug_break; });
- next= elist->next;
+ next= delete_list->next;
- table->field[1]->store(elist->sub_id, true);
+ table->field[1]->store(delete_list->sub_id, true);
/* domain_id is already set in table->record[0] from write_row() above. */
key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
if (table->file->ha_index_read_map(table->record[1], key_buffer,
@@ -662,8 +788,8 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
not want to endlessly error on the same element in case of table
corruption or such.
*/
- my_free(elist);
- elist= next;
+ my_free(delete_list);
+ delete_list= next;
if (err)
break;
}
@@ -681,13 +807,13 @@ end:
if (err || (err= ha_commit_trans(thd, FALSE)))
{
/*
- If error, we need to put any remaining elist back into the HASH so we
- can do another delete attempt later.
+ If error, we need to put any remaining delete_list back into the HASH
+ so we can do another delete attempt later.
*/
- if (elist)
+ if (delete_list)
{
mysql_mutex_lock(&LOCK_slave_state);
- put_back_list(gtid->domain_id, elist);
+ put_back_list(gtid->domain_id, delete_list);
mysql_mutex_unlock(&LOCK_slave_state);
}
@@ -1077,11 +1203,12 @@ rpl_slave_state::load(THD *thd, const char *state_from_master, size_t len,
{
rpl_gtid gtid;
uint64 sub_id;
+ void *hton= NULL;
if (gtid_parser_helper(&state_from_master, end, &gtid) ||
!(sub_id= next_sub_id(gtid.domain_id)) ||
- record_gtid(thd, &gtid, sub_id, false, in_statement) ||
- update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, NULL))
+ record_gtid(thd, &gtid, sub_id, false, in_statement, &hton) ||
+ update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL))
return 1;
if (state_from_master == end)
break;
@@ -1115,6 +1242,75 @@ rpl_slave_state::is_empty()
}
+void
+rpl_slave_state::free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table *list)
+{
+ struct gtid_pos_table *cur, *next;
+
+ cur= list;
+ while (cur)
+ {
+ next= cur->next;
+ my_free(cur);
+ cur= next;
+ }
+}
+
+
+/*
+ Replace the list of available mysql.gtid_slave_posXXX tables with a new list.
+ The caller must be holding LOCK_slave_state. Additionally, this function
+ must only be called while all SQL threads are stopped.
+*/
+void
+rpl_slave_state::set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table *new_list,
+ rpl_slave_state::gtid_pos_table *default_entry)
+{
+ gtid_pos_table *old_list;
+
+ mysql_mutex_assert_owner(&LOCK_slave_state);
+ old_list= (struct gtid_pos_table *)gtid_pos_tables;
+ my_atomic_storeptr_explicit(&gtid_pos_tables, new_list, MY_MEMORY_ORDER_RELEASE);
+ my_atomic_storeptr_explicit(&default_gtid_pos_table, default_entry,
+ MY_MEMORY_ORDER_RELEASE);
+ free_gtid_pos_tables(old_list);
+}
+
+
+void
+rpl_slave_state::add_gtid_pos_table(rpl_slave_state::gtid_pos_table *entry)
+{
+ mysql_mutex_assert_owner(&LOCK_slave_state);
+ entry->next= (struct gtid_pos_table *)gtid_pos_tables;
+ my_atomic_storeptr_explicit(&gtid_pos_tables, entry, MY_MEMORY_ORDER_RELEASE);
+}
+
+
+struct rpl_slave_state::gtid_pos_table *
+rpl_slave_state::alloc_gtid_pos_table(LEX_STRING *table_name, void *hton,
+ rpl_slave_state::gtid_pos_table_state state)
+{
+ struct gtid_pos_table *p;
+ char *allocated_str;
+
+ if (!my_multi_malloc(MYF(MY_WME),
+ &p, sizeof(*p),
+ &allocated_str, table_name->length+1,
+ NULL))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*p) + table_name->length+1));
+ return NULL;
+ }
+ memcpy(allocated_str, table_name->str, table_name->length+1); // Also copy '\0'
+ p->next = NULL;
+ p->table_hton= hton;
+ p->table_name.str= allocated_str;
+ p->table_name.length= table_name->length;
+ p->state= state;
+ return p;
+}
+
+
void rpl_binlog_state::init()
{
my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),