summaryrefslogtreecommitdiff
path: root/sql/rpl_gtid.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r--sql/rpl_gtid.cc443
1 files changed, 280 insertions, 163 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 8408025d389..88e6447ddcc 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -80,7 +80,7 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
rgi->gtid_pending= false;
if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
{
- if (record_gtid(thd, &rgi->current_gtid, sub_id, NULL, false, &hton))
+ if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false, &hton))
DBUG_RETURN(1);
update_state_hash(sub_id, &rgi->current_gtid, hton, rgi);
}
@@ -245,7 +245,7 @@ rpl_slave_state_free_element(void *arg)
rpl_slave_state::rpl_slave_state()
- : last_sub_id(0), gtid_pos_tables(0), loaded(false)
+ : pending_gtid_count(0), last_sub_id(0), gtid_pos_tables(0), loaded(false)
{
mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state,
MY_MUTEX_INIT_SLOW);
@@ -257,7 +257,7 @@ rpl_slave_state::rpl_slave_state()
rpl_slave_state::~rpl_slave_state()
{
- free_gtid_pos_tables((struct gtid_pos_table *)gtid_pos_tables);
+ free_gtid_pos_tables(gtid_pos_tables.load(std::memory_order_relaxed));
truncate_hash();
my_hash_free(&hash);
delete_dynamic(&gtid_sort_array);
@@ -332,14 +332,11 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
}
}
rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
-
-#ifdef HAVE_REPLICATION
- rgi->pending_gtid_deletes_clear();
-#endif
}
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
return 1;
+ list_elem->domain_id= domain_id;
list_elem->server_id= server_id;
list_elem->sub_id= sub_id;
list_elem->seq_no= seq_no;
@@ -349,6 +346,15 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
if (last_sub_id < sub_id)
last_sub_id= sub_id;
+#ifdef HAVE_REPLICATION
+ ++pending_gtid_count;
+ if (pending_gtid_count >= opt_gtid_cleanup_batch_size)
+ {
+ pending_gtid_count = 0;
+ slave_background_gtid_pending_delete_request();
+ }
+#endif
+
return 0;
}
@@ -383,20 +389,22 @@ rpl_slave_state::get_element(uint32 domain_id)
int
-rpl_slave_state::put_back_list(uint32 domain_id, list_element *list)
+rpl_slave_state::put_back_list(list_element *list)
{
- element *e;
+ element *e= NULL;
int err= 0;
mysql_mutex_lock(&LOCK_slave_state);
- if (!(e= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
- {
- err= 1;
- goto end;
- }
while (list)
{
list_element *next= list->next;
+
+ if ((!e || e->domain_id != list->domain_id) &&
+ !(e= (element *)my_hash_search(&hash, (const uchar *)&list->domain_id, 0)))
+ {
+ err= 1;
+ goto end;
+ }
e->add(list);
list= next;
}
@@ -489,21 +497,18 @@ gtid_check_rpl_slave_state_table(TABLE *table)
void
rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename)
{
- struct gtid_pos_table *list, *table_entry, *default_entry;
-
/*
See comments on rpl_slave_state::gtid_pos_tables for rules around proper
access to the list.
*/
- list= (struct gtid_pos_table *)
- my_atomic_loadptr_explicit(&gtid_pos_tables, MY_MEMORY_ORDER_ACQUIRE);
+ auto list= gtid_pos_tables.load(std::memory_order_acquire);
Ha_trx_info *ha_info;
uint count = 0;
for (ha_info= thd->transaction.all.ha_list; ha_info; ha_info= ha_info->next())
{
void *trx_hton= ha_info->ht();
- table_entry= list;
+ auto table_entry= list;
if (!ha_info->is_trx_read_write() || trx_hton == binlog_hton)
continue;
@@ -557,9 +562,8 @@ rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename)
already active in the transaction, or if there is no current transaction
engines available, we return the default gtid_slave_pos table.
*/
- default_entry= (struct gtid_pos_table *)
- my_atomic_loadptr_explicit(&default_gtid_pos_table, MY_MEMORY_ORDER_ACQUIRE);
- *out_tablename= default_entry->table_name;
+ *out_tablename=
+ default_gtid_pos_table.load(std::memory_order_acquire)->table_name;
/* Record in status that we failed to find a suitable gtid_pos table. */
if (count > 0)
{
@@ -573,12 +577,12 @@ rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename)
/*
Write a gtid to the replication slave state table.
+ Do it as part of the transaction, to get slave crash safety, or as a separate
+ transaction if !in_transaction (eg. MyISAM or DDL).
+
gtid The global transaction id for this event group.
sub_id Value allocated within the sub_id when the event group was
read (sub_id must be consistent with commit order in master binlog).
- rgi rpl_group_info context, if we are recording the gtid transactionally
- as part of replicating a transactional event. NULL if called from
- outside of a replicated transaction.
Note that caller must later ensure that the new gtid and sub_id is inserted
into the appropriate HASH element with rpl_slave_state.add(), so that it can
@@ -586,16 +590,13 @@ rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename)
*/
int
rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
- rpl_group_info *rgi, bool in_statement,
+ bool in_transaction, bool in_statement,
void **out_hton)
{
TABLE_LIST tlist;
int err= 0, not_sql_thread;
bool table_opened= false;
TABLE *table;
- list_element *delete_list= 0, *next, *cur, **next_ptr_ptr, **best_ptr_ptr;
- uint64 best_sub_id;
- element *elem;
ulonglong thd_saved_option= thd->variables.option_bits;
Query_tables_list lex_backup;
wait_for_commit* suspended_wfc;
@@ -685,7 +686,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
thd->wsrep_ignore_table= true;
#endif
- if (!rgi)
+ if (!in_transaction)
{
DBUG_PRINT("info", ("resetting OPTION_BEGIN"));
thd->variables.option_bits&=
@@ -717,168 +718,287 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
+end:
- mysql_mutex_lock(&LOCK_slave_state);
- if ((elem= get_element(gtid->domain_id)) == NULL)
+#ifdef WITH_WSREP
+ thd->wsrep_ignore_table= false;
+#endif
+
+ if (table_opened)
{
- mysql_mutex_unlock(&LOCK_slave_state);
- my_error(ER_OUT_OF_RESOURCES, MYF(0));
- err= 1;
- goto end;
+ if (err || (err= ha_commit_trans(thd, FALSE)))
+ ha_rollback_trans(thd, FALSE);
+
+ close_thread_tables(thd);
+ if (in_transaction)
+ thd->mdl_context.release_statement_locks();
+ else
+ thd->mdl_context.release_transactional_locks();
}
+ thd->lex->restore_backup_query_tables_list(&lex_backup);
+ thd->variables.option_bits= thd_saved_option;
+ thd->resume_subsequent_commits(suspended_wfc);
+ DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep",
+ {
+ if (gtid->server_id == 100)
+ my_sleep(500000);
+ });
+ DBUG_RETURN(err);
+}
- /* Now pull out all GTIDs that were recorded in this engine. */
- delete_list = NULL;
- next_ptr_ptr= &elem->list;
- cur= elem->list;
- best_sub_id= 0;
- best_ptr_ptr= NULL;
- while (cur)
+
+/*
+ Return a list of all old GTIDs in any mysql.gtid_slave_pos* table that are
+ no longer needed and can be deleted from the table.
+
+ Within each domain, we need to keep around the latest GTID (the one with the
+ highest sub_id), but any others in that domain can be deleted.
+*/
+rpl_slave_state::list_element *
+rpl_slave_state::gtid_grab_pending_delete_list()
+{
+ uint32 i;
+ list_element *full_list;
+
+ mysql_mutex_lock(&LOCK_slave_state);
+ full_list= NULL;
+ for (i= 0; i < hash.records; ++i)
{
- list_element *next= cur->next;
- if (cur->hton == hton)
- {
- /* Belongs to same engine, so move it to the delete list. */
- cur->next= delete_list;
- delete_list= cur;
- if (cur->sub_id > best_sub_id)
+ element *elem= (element *)my_hash_element(&hash, i);
+ list_element *elist= elem->list;
+ list_element *last_elem, **best_ptr_ptr, *cur, *next;
+ uint64 best_sub_id;
+
+ if (!elist)
+ continue; /* Nothing here */
+
+ /* Delete any old stuff, but keep around the most recent one. */
+ cur= elist;
+ best_sub_id= cur->sub_id;
+ best_ptr_ptr= &elist;
+ last_elem= cur;
+ while ((next= cur->next)) {
+ last_elem= next;
+ if (next->sub_id > best_sub_id)
{
- best_sub_id= cur->sub_id;
- best_ptr_ptr= &delete_list;
- }
- else if (best_ptr_ptr == &delete_list)
+ best_sub_id= next->sub_id;
best_ptr_ptr= &cur->next;
- }
- else
- {
- /* Another engine, leave it in the list. */
- if (cur->sub_id > best_sub_id)
- {
- best_sub_id= cur->sub_id;
- /* Current best is not on the delete list. */
- best_ptr_ptr= NULL;
}
- *next_ptr_ptr= cur;
- next_ptr_ptr= &cur->next;
+ cur= next;
}
- cur= next;
- }
- *next_ptr_ptr= NULL;
- /*
- If the highest sub_id element is on the delete list, put it back on the
- original list, to preserve the highest sub_id element in the table for
- GTID position recovery.
- */
- if (best_ptr_ptr)
- {
+ /*
+ Append the new elements to the full list. Note the order is important;
+ we do it here so that we do not break the list if best_sub_id is the
+ last of the new elements.
+ */
+ last_elem->next= full_list;
+ /*
+ Delete the highest sub_id element from the old list, and put it back as
+ the single-element new list.
+ */
cur= *best_ptr_ptr;
*best_ptr_ptr= cur->next;
- cur->next= elem->list;
+ cur->next= NULL;
elem->list= cur;
+
+ /*
+ Collect the full list so far here. Note that elist may have moved if we
+ deleted the first element, so order is again important.
+ */
+ full_list= elist;
}
mysql_mutex_unlock(&LOCK_slave_state);
- if (!delete_list)
- goto end;
+ return full_list;
+}
- /* Now delete any already committed GTIDs. */
- bitmap_set_bit(table->read_set, table->field[0]->field_index);
- bitmap_set_bit(table->read_set, table->field[1]->field_index);
- if ((err= table->file->ha_index_init(0, 0)))
+/* Find the mysql.gtid_slave_posXXX table associated with a given hton. */
+LEX_CSTRING *
+rpl_slave_state::select_gtid_pos_table(void *hton)
+{
+ /*
+ See comments on rpl_slave_state::gtid_pos_tables for rules around proper
+ access to the list.
+ */
+ auto table_entry= gtid_pos_tables.load(std::memory_order_acquire);
+
+ while (table_entry)
{
- table->file->print_error(err, MYF(0));
- goto end;
+ if (table_entry->table_hton == hton)
+ {
+ if (likely(table_entry->state == GTID_POS_AVAILABLE))
+ return &table_entry->table_name;
+ }
+ table_entry= table_entry->next;
}
- cur = delete_list;
- while (cur)
- {
- uchar key_buffer[4+8];
- DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete",
- { err= ENOENT;
- table->file->print_error(err, MYF(0));
- /* `break' does not work inside DBUG_EXECUTE_IF */
- goto dbug_break; });
+ return &default_gtid_pos_table.load(std::memory_order_acquire)->table_name;
+}
+
- next= cur->next;
+void
+rpl_slave_state::gtid_delete_pending(THD *thd,
+ rpl_slave_state::list_element **list_ptr)
+{
+ int err= 0;
+ ulonglong thd_saved_option;
+
+ if (unlikely(!loaded))
+ return;
+
+#ifdef WITH_WSREP
+ /*
+ Updates in slave state table should not be appended to galera transaction
+ writeset.
+ */
+ thd->wsrep_ignore_table= true;
+#endif
+
+ thd_saved_option= thd->variables.option_bits;
+ thd->variables.option_bits&=
+ ~(ulonglong)(OPTION_NOT_AUTOCOMMIT |OPTION_BEGIN |OPTION_BIN_LOG |
+ OPTION_GTID_BEGIN);
+
+ while (*list_ptr)
+ {
+ LEX_CSTRING *gtid_pos_table_name, *tmp_table_name;
+ Query_tables_list lex_backup;
+ TABLE_LIST tlist;
+ TABLE *table;
+ handler::Table_flags direct_pos= 0;
+ list_element *cur, **cur_ptr_ptr;
+ bool table_opened= false;
+ bool index_inited= false;
+ void *hton= (*list_ptr)->hton;
+
+ thd->reset_for_next_command();
- table->field[1]->store(cur->sub_id, true);
- /* domain_id is already set in table->record[0] from write_row() above. */
- key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
- if (table->file->ha_index_read_map(table->record[1], key_buffer,
- HA_WHOLE_KEY, HA_READ_KEY_EXACT))
- /* We cannot find the row, assume it is already deleted. */
- ;
- else if ((err= table->file->ha_delete_row(table->record[1])))
- table->file->print_error(err, MYF(0));
/*
- In case of error, we still discard the element from the list. We do
- not want to endlessly error on the same element in case of table
- corruption or such.
+ Only the SQL thread can call select_gtid_pos_table without a mutex
+ Other threads needs to use a mutex and take into account that the
+ result may change during execution, so we have to make a copy.
*/
- cur= next;
- if (err)
+ mysql_mutex_lock(&LOCK_slave_state);
+ tmp_table_name= select_gtid_pos_table(hton);
+ gtid_pos_table_name= thd->make_clex_string(tmp_table_name->str,
+ tmp_table_name->length);
+ mysql_mutex_unlock(&LOCK_slave_state);
+ if (!gtid_pos_table_name)
+ {
+ /* Out of memory - we can try again later. */
break;
- }
-IF_DBUG(dbug_break:, )
- table->file->ha_index_end();
+ }
-end:
+ thd->lex->reset_n_backup_query_tables_list(&lex_backup);
+ tlist.init_one_table(&MYSQL_SCHEMA_NAME, gtid_pos_table_name, NULL, TL_WRITE);
+ if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
+ goto end;
+ table_opened= true;
+ table= tlist.table;
-#ifdef WITH_WSREP
- thd->wsrep_ignore_table= false;
-#endif
+ if ((err= gtid_check_rpl_slave_state_table(table)))
+ goto end;
- if (table_opened)
- {
- if (err || (err= ha_commit_trans(thd, FALSE)))
+ direct_pos= table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION;
+ bitmap_set_all(table->write_set);
+ table->rpl_write_set= table->write_set;
+
+ /* Now delete any already committed GTIDs. */
+ bitmap_set_bit(table->read_set, table->field[0]->field_index);
+ bitmap_set_bit(table->read_set, table->field[1]->field_index);
+
+ if (!direct_pos)
{
- /*
- If error, we need to put any remaining delete_list back into the HASH
- so we can do another delete attempt later.
- */
- if (delete_list)
+ if ((err= table->file->ha_index_init(0, 0)))
{
- put_back_list(gtid->domain_id, delete_list);
- delete_list = 0;
+ table->file->print_error(err, MYF(0));
+ goto end;
}
-
- ha_rollback_trans(thd, FALSE);
+ index_inited= true;
}
- close_thread_tables(thd);
- if (rgi)
+
+ cur = *list_ptr;
+ cur_ptr_ptr = list_ptr;
+ do
{
- thd->mdl_context.release_statement_locks();
+ uchar key_buffer[4+8];
+ list_element *next= cur->next;
+
+ if (cur->hton == hton)
+ {
+ int res;
+
+ table->field[0]->store((ulonglong)cur->domain_id, true);
+ table->field[1]->store(cur->sub_id, true);
+ if (direct_pos)
+ {
+ res= table->file->ha_rnd_pos_by_record(table->record[0]);
+ }
+ else
+ {
+ key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
+ res= table->file->ha_index_read_map(table->record[0], key_buffer,
+ HA_WHOLE_KEY, HA_READ_KEY_EXACT);
+ }
+ DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete",
+ { res= 1;
+ err= ENOENT;
+ sql_print_error("<DEBUG> Error deleting old GTID row");
+ });
+ if (res)
+ /* We cannot find the row, assume it is already deleted. */
+ ;
+ else if ((err= table->file->ha_delete_row(table->record[0])))
+ {
+ sql_print_error("Error deleting old GTID row: %s",
+ thd->get_stmt_da()->message());
+ /*
+ In case of error, we still discard the element from the list. We do
+ not want to endlessly error on the same element in case of table
+ corruption or such.
+ */
+ }
+ *cur_ptr_ptr= next;
+ my_free(cur);
+ }
+ else
+ {
+ /* Leave this one in the list until we get to the table for its hton. */
+ cur_ptr_ptr= &cur->next;
+ }
+ cur= next;
+ if (err)
+ break;
+ } while (cur);
+end:
+ if (table_opened)
+ {
+ DBUG_ASSERT(direct_pos || index_inited || err);
/*
- Save the list of old gtid entries we deleted. If this transaction
- fails later for some reason and is rolled back, the deletion of those
- entries will be rolled back as well, and we will need to put them back
- on the to-be-deleted list so we can re-do the deletion. Otherwise
- redundant rows in mysql.gtid_slave_pos may accumulate if transactions
- are rolled back and retried after record_gtid().
+ Index may not be initialized if there was a failure during
+ 'ha_index_init'. Hence check if index initialization is successful and
+ then invoke ha_index_end(). Ending an index which is not initialized
+ will lead to assert.
*/
-#ifdef HAVE_REPLICATION
- rgi->pending_gtid_deletes_save(gtid->domain_id, delete_list);
-#endif
- }
- else
- {
- thd->release_transactional_locks();
-#ifdef HAVE_REPLICATION
- rpl_group_info::pending_gtid_deletes_free(delete_list);
-#endif
+ if (index_inited)
+ table->file->ha_index_end();
+
+ if (err || (err= ha_commit_trans(thd, FALSE)))
+ ha_rollback_trans(thd, FALSE);
}
+ close_thread_tables(thd);
+ thd->release_transactional_locks();
+ thd->lex->restore_backup_query_tables_list(&lex_backup);
+
+ if (err)
+ break;
}
- thd->lex->restore_backup_query_tables_list(&lex_backup);
thd->variables.option_bits= thd_saved_option;
- thd->resume_subsequent_commits(suspended_wfc);
- DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep",
- {
- if (gtid->server_id == 100)
- my_sleep(500000);
- });
- DBUG_RETURN(err);
+
+#ifdef WITH_WSREP
+ thd->wsrep_ignore_table= false;
+#endif
}
@@ -1252,7 +1372,7 @@ rpl_slave_state::load(THD *thd, const char *state_from_master, size_t len,
if (gtid_parser_helper(&state_from_master, end, &gtid) ||
!(sub_id= next_sub_id(gtid.domain_id)) ||
- record_gtid(thd, &gtid, sub_id, NULL, in_statement, &hton) ||
+ record_gtid(thd, &gtid, sub_id, false, in_statement, &hton) ||
update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL))
return 1;
if (state_from_master == end)
@@ -1311,13 +1431,10 @@ void
rpl_slave_state::set_gtid_pos_tables_list(rpl_slave_state::gtid_pos_table *new_list,
rpl_slave_state::gtid_pos_table *default_entry)
{
- gtid_pos_table *old_list;
-
mysql_mutex_assert_owner(&LOCK_slave_state);
- old_list= (struct gtid_pos_table *)gtid_pos_tables;
- my_atomic_storeptr_explicit(&gtid_pos_tables, new_list, MY_MEMORY_ORDER_RELEASE);
- my_atomic_storeptr_explicit(&default_gtid_pos_table, default_entry,
- MY_MEMORY_ORDER_RELEASE);
+ auto old_list= gtid_pos_tables.load(std::memory_order_relaxed);
+ gtid_pos_tables.store(new_list, std::memory_order_release);
+ default_gtid_pos_table.store(default_entry, std::memory_order_release);
free_gtid_pos_tables(old_list);
}
@@ -1326,8 +1443,8 @@ void
rpl_slave_state::add_gtid_pos_table(rpl_slave_state::gtid_pos_table *entry)
{
mysql_mutex_assert_owner(&LOCK_slave_state);
- entry->next= (struct gtid_pos_table *)gtid_pos_tables;
- my_atomic_storeptr_explicit(&gtid_pos_tables, entry, MY_MEMORY_ORDER_RELEASE);
+ entry->next= gtid_pos_tables.load(std::memory_order_relaxed);
+ gtid_pos_tables.store(entry, std::memory_order_release);
}