diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2017-07-03 09:33:41 +0200 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2017-07-03 09:33:41 +0200 |
commit | 1d91910b944a801a2bbe138d4258c53eaeb0c473 (patch) | |
tree | 76f2ed8b5bb2c9e44eea7cc0366d8d1d7a070966 /sql/rpl_gtid.cc | |
parent | 176000a54ceb8dabe8f8b985aff565dfae6fb0df (diff) | |
parent | 95e09f0766f037530d2dcfbb6c530137a4ee0db4 (diff) | |
download | mariadb-git-1d91910b944a801a2bbe138d4258c53eaeb0c473.tar.gz |
MDEV-12179: Per-engine mysql.gtid_slave_pos table
Merge into MariaDB 10.3.
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r-- | sql/rpl_gtid.cc | 274 |
1 files changed, 235 insertions, 39 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index c385434e41e..fb57babd50f 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -26,6 +26,7 @@ #include "key.h" #include "rpl_gtid.h" #include "rpl_rli.h" +#include "slave.h" const LEX_STRING rpl_gtid_slave_state_table_name= @@ -33,7 +34,7 @@ const LEX_STRING rpl_gtid_slave_state_table_name= void -rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, +rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, void *hton, rpl_group_info *rgi) { int err; @@ -45,7 +46,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, it is even committed. */ mysql_mutex_lock(&LOCK_slave_state); - err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rgi); + err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, hton, rgi); mysql_mutex_unlock(&LOCK_slave_state); if (err) { @@ -74,12 +75,14 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi) if (rgi->gtid_pending) { uint64 sub_id= rgi->gtid_sub_id; + void *hton= NULL; + rgi->gtid_pending= false; if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE) { - if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false)) + if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false, &hton)) DBUG_RETURN(1); - update_state_hash(sub_id, &rgi->current_gtid, rgi); + update_state_hash(sub_id, &rgi->current_gtid, hton, rgi); } rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; } @@ -243,7 +246,7 @@ rpl_slave_state_free_element(void *arg) rpl_slave_state::rpl_slave_state() - : last_sub_id(0), loaded(false) + : last_sub_id(0), gtid_pos_tables(0), loaded(false) { mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state, MY_MUTEX_INIT_SLOW); @@ -255,6 +258,7 @@ rpl_slave_state::rpl_slave_state() rpl_slave_state::~rpl_slave_state() { + free_gtid_pos_tables((struct gtid_pos_table *)gtid_pos_tables); truncate_hash(); my_hash_free(&hash); delete_dynamic(>id_sort_array); @@ -286,11 +290,12 @@ rpl_slave_state::truncate_hash() int rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, - uint64 seq_no, rpl_group_info *rgi) + uint64 seq_no, void *hton, rpl_group_info *rgi) { element *elem= NULL; list_element *list_elem= NULL; + DBUG_ASSERT(hton || !loaded); if (!(elem= get_element(domain_id))) return 1; @@ -335,6 +340,7 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, list_elem->server_id= server_id; list_elem->sub_id= sub_id; list_elem->seq_no= seq_no; + list_elem->hton= hton; elem->add(list_elem); if (last_sub_id < sub_id) @@ -466,6 +472,94 @@ gtid_check_rpl_slave_state_table(TABLE *table) /* + Attempt to find a mysql.gtid_slave_posXXX table that has a storage engine + that is already in use by the current transaction, if any. +*/ +void +rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_STRING *out_tablename) +{ + struct gtid_pos_table *list, *table_entry, *default_entry; + + /* + See comments on rpl_slave_state::gtid_pos_tables for rules around proper + access to the list. + */ + list= (struct gtid_pos_table *) + my_atomic_loadptr_explicit(>id_pos_tables, MY_MEMORY_ORDER_ACQUIRE); + + Ha_trx_info *ha_info; + uint count = 0; + for (ha_info= thd->transaction.all.ha_list; ha_info; ha_info= ha_info->next()) + { + void *trx_hton= ha_info->ht(); + table_entry= list; + + if (!ha_info->is_trx_read_write() || trx_hton == binlog_hton) + continue; + while (table_entry) + { + if (table_entry->table_hton == trx_hton) + { + if (likely(table_entry->state == GTID_POS_AVAILABLE)) + { + *out_tablename= table_entry->table_name; + /* + Check if this is a cross-engine transaction, so we can correctly + maintain the rpl_transactions_multi_engine status variable. + */ + if (count >= 1) + statistic_increment(rpl_transactions_multi_engine, LOCK_status); + else + { + for (;;) + { + ha_info= ha_info->next(); + if (!ha_info) + break; + if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton) + { + statistic_increment(rpl_transactions_multi_engine, LOCK_status); + break; + } + } + } + return; + } + /* + This engine is marked to automatically create the table. + We cannot easily do this here (possibly in the middle of a + transaction). But we can request the slave background thread + to create it, and in a short while it should become available + for following transactions. + */ +#ifdef HAVE_REPLICATION + slave_background_gtid_pos_create_request(table_entry); +#endif + break; + } + table_entry= table_entry->next; + } + ++count; + } + /* + If we cannot find any table whose engine matches an engine that is + already active in the transaction, or if there is no current transaction + engines available, we return the default gtid_slave_pos table. + */ + default_entry= (struct gtid_pos_table *) + my_atomic_loadptr_explicit(&default_gtid_pos_table, MY_MEMORY_ORDER_ACQUIRE); + *out_tablename= default_entry->table_name; + /* Record in status that we failed to find a suitable gtid_pos table. */ + if (count > 0) + { + statistic_increment(transactions_gtid_foreign_engine, LOCK_status); + if (count > 1) + statistic_increment(rpl_transactions_multi_engine, LOCK_status); + } +} + + +/* Write a gtid to the replication slave state table. Do it as part of the transaction, to get slave crash safety, or as a separate @@ -481,19 +575,24 @@ gtid_check_rpl_slave_state_table(TABLE *table) */ int rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, - bool in_transaction, bool in_statement) + bool in_transaction, bool in_statement, + void **out_hton) { TABLE_LIST tlist; int err= 0; bool table_opened= false; TABLE *table; - list_element *elist= 0, *next; + list_element *delete_list= 0, *next, *cur, **next_ptr_ptr, **best_ptr_ptr; + uint64_t best_sub_id; element *elem; ulonglong thd_saved_option= thd->variables.option_bits; Query_tables_list lex_backup; wait_for_commit* suspended_wfc; + void *hton= NULL; + LEX_STRING gtid_pos_table_name; DBUG_ENTER("record_gtid"); + *out_hton= NULL; if (unlikely(!loaded)) { /* @@ -508,6 +607,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, if (!in_statement) thd->reset_for_next_command(); + select_gtid_pos_table(thd, >id_pos_table_name); DBUG_EXECUTE_IF("gtid_inject_record_gtid", { @@ -538,14 +638,13 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, */ suspended_wfc= thd->suspend_subsequent_commits(); thd->lex->reset_n_backup_query_tables_list(&lex_backup); - tlist.init_one_table(STRING_WITH_LEN("mysql"), - rpl_gtid_slave_state_table_name.str, - rpl_gtid_slave_state_table_name.length, - NULL, TL_WRITE); + tlist.init_one_table(STRING_WITH_LEN("mysql"), gtid_pos_table_name.str, + gtid_pos_table_name.length, NULL, TL_WRITE); if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) goto end; table_opened= true; table= tlist.table; + hton= table->s->db_type(); if ((err= gtid_check_rpl_slave_state_table(table))) goto end; @@ -581,6 +680,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, table->file->print_error(err, MYF(0)); goto end; } + *out_hton= hton; if(opt_bin_log && (err= mysql_bin_log.bump_seq_no_counter_if_needed(gtid->domain_id, @@ -598,36 +698,62 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, err= 1; goto end; } - if ((elist= elem->grab_list()) != NULL) + + /* Now pull out all GTIDs that were recorded in this engine. */ + delete_list = NULL; + next_ptr_ptr= &elem->list; + cur= elem->list; + best_sub_id= 0; + best_ptr_ptr= NULL; + while (cur) { - /* Delete any old stuff, but keep around the most recent one. */ - list_element *cur= elist; - uint64 best_sub_id= cur->sub_id; - list_element **best_ptr_ptr= &elist; - while ((next= cur->next)) + list_element *next= cur->next; + if (cur->hton == hton) { - if (next->sub_id > best_sub_id) + /* Belongs to same engine, so move it to the delete list. */ + cur->next= delete_list; + delete_list= cur; + if (cur->sub_id > best_sub_id) { - best_sub_id= next->sub_id; + best_sub_id= cur->sub_id; + best_ptr_ptr= &delete_list; + } + else if (best_ptr_ptr == &delete_list) best_ptr_ptr= &cur->next; + } + else + { + /* Another engine, leave it in the list. */ + if (cur->sub_id > best_sub_id) + { + best_sub_id= cur->sub_id; + /* Current best is not on the delete list. */ + best_ptr_ptr= NULL; } - cur= next; + *next_ptr_ptr= cur; + next_ptr_ptr= &cur->next; } - /* - Delete the highest sub_id element from the old list, and put it back as - the single-element new list. - */ + cur= next; + } + *next_ptr_ptr= NULL; + /* + If the highest sub_id element is on the delete list, put it back on the + original list, to preserve the highest sub_id element in the table for + GTID position recovery. + */ + if (best_ptr_ptr) + { cur= *best_ptr_ptr; *best_ptr_ptr= cur->next; - cur->next= NULL; + cur->next= elem->list; elem->list= cur; } mysql_mutex_unlock(&LOCK_slave_state); - if (!elist) + if (!delete_list) goto end; - /* Now delete any already committed rows. */ + /* Now delete any already committed GTIDs. */ bitmap_set_bit(table->read_set, table->field[0]->field_index); bitmap_set_bit(table->read_set, table->field[1]->field_index); @@ -636,7 +762,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, table->file->print_error(err, MYF(0)); goto end; } - while (elist) + while (delete_list) { uchar key_buffer[4+8]; @@ -646,9 +772,9 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, /* `break' does not work inside DBUG_EXECUTE_IF */ goto dbug_break; }); - next= elist->next; + next= delete_list->next; - table->field[1]->store(elist->sub_id, true); + table->field[1]->store(delete_list->sub_id, true); /* domain_id is already set in table->record[0] from write_row() above. */ key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false); if (table->file->ha_index_read_map(table->record[1], key_buffer, @@ -662,8 +788,8 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, not want to endlessly error on the same element in case of table corruption or such. */ - my_free(elist); - elist= next; + my_free(delete_list); + delete_list= next; if (err) break; } @@ -681,13 +807,13 @@ end: if (err || (err= ha_commit_trans(thd, FALSE))) { /* - If error, we need to put any remaining elist back into the HASH so we - can do another delete attempt later. + If error, we need to put any remaining delete_list back into the HASH + so we can do another delete attempt later. */ - if (elist) + if (delete_list) { mysql_mutex_lock(&LOCK_slave_state); - put_back_list(gtid->domain_id, elist); + put_back_list(gtid->domain_id, delete_list); mysql_mutex_unlock(&LOCK_slave_state); } @@ -1077,11 +1203,12 @@ rpl_slave_state::load(THD *thd, const char *state_from_master, size_t len, { rpl_gtid gtid; uint64 sub_id; + void *hton= NULL; if (gtid_parser_helper(&state_from_master, end, >id) || !(sub_id= next_sub_id(gtid.domain_id)) || - record_gtid(thd, >id, sub_id, false, in_statement) || - update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, NULL)) + record_gtid(thd, >id, sub_id, false, in_statement, &hton) || + update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL)) return 1; if (state_from_master == end) break; @@ -1115,6 +1242,75 @@ rpl_slave_state::is_empty() } +void +rpl_slave_state::free_gtid_pos_tables(struct rpl_slave_state::gtid_pos_table *list) +{ + struct gtid_pos_table *cur, *next; + + cur= list; + while (cur) + { + next= cur->next; + my_free(cur); + cur= next; + } +} + + +/* + Replace the list of available mysql.gtid_slave_posXXX tables with a new list. + The caller must be holding LOCK_slave_state. Additionally, this function + must only be called while all SQL threads are stopped. +*/ +void +rpl_slave_state::set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table *new_list, + rpl_slave_state::gtid_pos_table *default_entry) +{ + gtid_pos_table *old_list; + + mysql_mutex_assert_owner(&LOCK_slave_state); + old_list= (struct gtid_pos_table *)gtid_pos_tables; + my_atomic_storeptr_explicit(>id_pos_tables, new_list, MY_MEMORY_ORDER_RELEASE); + my_atomic_storeptr_explicit(&default_gtid_pos_table, default_entry, + MY_MEMORY_ORDER_RELEASE); + free_gtid_pos_tables(old_list); +} + + +void +rpl_slave_state::add_gtid_pos_table(rpl_slave_state::gtid_pos_table *entry) +{ + mysql_mutex_assert_owner(&LOCK_slave_state); + entry->next= (struct gtid_pos_table *)gtid_pos_tables; + my_atomic_storeptr_explicit(>id_pos_tables, entry, MY_MEMORY_ORDER_RELEASE); +} + + +struct rpl_slave_state::gtid_pos_table * +rpl_slave_state::alloc_gtid_pos_table(LEX_STRING *table_name, void *hton, + rpl_slave_state::gtid_pos_table_state state) +{ + struct gtid_pos_table *p; + char *allocated_str; + + if (!my_multi_malloc(MYF(MY_WME), + &p, sizeof(*p), + &allocated_str, table_name->length+1, + NULL)) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*p) + table_name->length+1)); + return NULL; + } + memcpy(allocated_str, table_name->str, table_name->length+1); // Also copy '\0' + p->next = NULL; + p->table_hton= hton; + p->table_name.str= allocated_str; + p->table_name.length= table_name->length; + p->state= state; + return p; +} + + void rpl_binlog_state::init() { my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id), |