summaryrefslogtreecommitdiff
path: root/sql/rpl_gtid.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r--sql/rpl_gtid.cc413
1 files changed, 265 insertions, 148 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 322b84130f2..17f474c2acf 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -79,7 +79,7 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
rgi->gtid_pending= false;
if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
{
- if (record_gtid(thd, &rgi->current_gtid, sub_id, NULL, false, &hton))
+ if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false, &hton))
DBUG_RETURN(1);
update_state_hash(sub_id, &rgi->current_gtid, hton, rgi);
}
@@ -244,7 +244,7 @@ rpl_slave_state_free_element(void *arg)
rpl_slave_state::rpl_slave_state()
- : last_sub_id(0), gtid_pos_tables(0), loaded(false)
+ : pending_gtid_count(0), last_sub_id(0), gtid_pos_tables(0), loaded(false)
{
mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state,
MY_MUTEX_INIT_SLOW);
@@ -331,14 +331,11 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
}
}
rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
-
-#ifdef HAVE_REPLICATION
- rgi->pending_gtid_deletes_clear();
-#endif
}
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
return 1;
+ list_elem->domain_id= domain_id;
list_elem->server_id= server_id;
list_elem->sub_id= sub_id;
list_elem->seq_no= seq_no;
@@ -348,6 +345,15 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
if (last_sub_id < sub_id)
last_sub_id= sub_id;
+#ifdef HAVE_REPLICATION
+ ++pending_gtid_count;
+ if (pending_gtid_count >= opt_gtid_cleanup_batch_size)
+ {
+ pending_gtid_count = 0;
+ slave_background_gtid_pending_delete_request();
+ }
+#endif
+
return 0;
}
@@ -382,20 +388,22 @@ rpl_slave_state::get_element(uint32 domain_id)
int
-rpl_slave_state::put_back_list(uint32 domain_id, list_element *list)
+rpl_slave_state::put_back_list(list_element *list)
{
- element *e;
+ element *e= NULL;
int err= 0;
mysql_mutex_lock(&LOCK_slave_state);
- if (!(e= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
- {
- err= 1;
- goto end;
- }
while (list)
{
list_element *next= list->next;
+
+ if ((!e || e->domain_id != list->domain_id) &&
+ !(e= (element *)my_hash_search(&hash, (const uchar *)&list->domain_id, 0)))
+ {
+ err= 1;
+ goto end;
+ }
e->add(list);
list= next;
}
@@ -572,12 +580,12 @@ rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename)
/*
Write a gtid to the replication slave state table.
+ Do it as part of the transaction, to get slave crash safety, or as a separate
+ transaction if !in_transaction (eg. MyISAM or DDL).
+
gtid The global transaction id for this event group.
sub_id Value allocated within the sub_id when the event group was
read (sub_id must be consistent with commit order in master binlog).
- rgi rpl_group_info context, if we are recording the gtid transactionally
- as part of replicating a transactional event. NULL if called from
- outside of a replicated transaction.
Note that caller must later ensure that the new gtid and sub_id is inserted
into the appropriate HASH element with rpl_slave_state.add(), so that it can
@@ -585,16 +593,13 @@ rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename)
*/
int
rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
- rpl_group_info *rgi, bool in_statement,
+ bool in_transaction, bool in_statement,
void **out_hton)
{
TABLE_LIST tlist;
int err= 0, not_sql_thread;
bool table_opened= false;
TABLE *table;
- list_element *delete_list= 0, *next, *cur, **next_ptr_ptr, **best_ptr_ptr;
- uint64 best_sub_id;
- element *elem;
ulonglong thd_saved_option= thd->variables.option_bits;
Query_tables_list lex_backup;
wait_for_commit* suspended_wfc;
@@ -684,7 +689,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
thd->wsrep_ignore_table= true;
#endif
- if (!rgi)
+ if (!in_transaction)
{
DBUG_PRINT("info", ("resetting OPTION_BEGIN"));
thd->variables.option_bits&=
@@ -716,168 +721,280 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
+end:
- mysql_mutex_lock(&LOCK_slave_state);
- if ((elem= get_element(gtid->domain_id)) == NULL)
+#ifdef WITH_WSREP
+ thd->wsrep_ignore_table= false;
+#endif
+
+ if (table_opened)
{
- mysql_mutex_unlock(&LOCK_slave_state);
- my_error(ER_OUT_OF_RESOURCES, MYF(0));
- err= 1;
- goto end;
+ if (err || (err= ha_commit_trans(thd, FALSE)))
+ ha_rollback_trans(thd, FALSE);
+
+ close_thread_tables(thd);
+ if (in_transaction)
+ thd->mdl_context.release_statement_locks();
+ else
+ thd->mdl_context.release_transactional_locks();
}
+ thd->lex->restore_backup_query_tables_list(&lex_backup);
+ thd->variables.option_bits= thd_saved_option;
+ thd->resume_subsequent_commits(suspended_wfc);
+ DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep",
+ {
+ if (gtid->server_id == 100)
+ my_sleep(500000);
+ });
+ DBUG_RETURN(err);
+}
- /* Now pull out all GTIDs that were recorded in this engine. */
- delete_list = NULL;
- next_ptr_ptr= &elem->list;
- cur= elem->list;
- best_sub_id= 0;
- best_ptr_ptr= NULL;
- while (cur)
+
+/*
+ Return a list of all old GTIDs in any mysql.gtid_slave_pos* table that are
+ no longer needed and can be deleted from the table.
+
+ Within each domain, we need to keep around the latest GTID (the one with the
+ highest sub_id), but any others in that domain can be deleted.
+*/
+rpl_slave_state::list_element *
+rpl_slave_state::gtid_grab_pending_delete_list()
+{
+ uint32 i;
+ list_element *full_list;
+
+ mysql_mutex_lock(&LOCK_slave_state);
+ full_list= NULL;
+ for (i= 0; i < hash.records; ++i)
{
- list_element *next= cur->next;
- if (cur->hton == hton)
- {
- /* Belongs to same engine, so move it to the delete list. */
- cur->next= delete_list;
- delete_list= cur;
- if (cur->sub_id > best_sub_id)
+ element *elem= (element *)my_hash_element(&hash, i);
+ list_element *elist= elem->list;
+ list_element *last_elem, **best_ptr_ptr, *cur, *next;
+ uint64 best_sub_id;
+
+ if (!elist)
+ continue; /* Nothing here */
+
+ /* Delete any old stuff, but keep around the most recent one. */
+ cur= elist;
+ best_sub_id= cur->sub_id;
+ best_ptr_ptr= &elist;
+ last_elem= cur;
+ while ((next= cur->next)) {
+ last_elem= next;
+ if (next->sub_id > best_sub_id)
{
- best_sub_id= cur->sub_id;
- best_ptr_ptr= &delete_list;
- }
- else if (best_ptr_ptr == &delete_list)
+ best_sub_id= next->sub_id;
best_ptr_ptr= &cur->next;
- }
- else
- {
- /* Another engine, leave it in the list. */
- if (cur->sub_id > best_sub_id)
- {
- best_sub_id= cur->sub_id;
- /* Current best is not on the delete list. */
- best_ptr_ptr= NULL;
}
- *next_ptr_ptr= cur;
- next_ptr_ptr= &cur->next;
+ cur= next;
}
- cur= next;
- }
- *next_ptr_ptr= NULL;
- /*
- If the highest sub_id element is on the delete list, put it back on the
- original list, to preserve the highest sub_id element in the table for
- GTID position recovery.
- */
- if (best_ptr_ptr)
- {
+ /*
+ Append the new elements to the full list. Note the order is important;
+ we do it here so that we do not break the list if best_sub_id is the
+ last of the new elements.
+ */
+ last_elem->next= full_list;
+ /*
+ Delete the highest sub_id element from the old list, and put it back as
+ the single-element new list.
+ */
cur= *best_ptr_ptr;
*best_ptr_ptr= cur->next;
- cur->next= elem->list;
+ cur->next= NULL;
elem->list= cur;
+
+ /*
+ Collect the full list so far here. Note that elist may have moved if we
+ deleted the first element, so order is again important.
+ */
+ full_list= elist;
}
mysql_mutex_unlock(&LOCK_slave_state);
- if (!delete_list)
- goto end;
+ return full_list;
+}
+
- /* Now delete any already committed GTIDs. */
- bitmap_set_bit(table->read_set, table->field[0]->field_index);
- bitmap_set_bit(table->read_set, table->field[1]->field_index);
+/* Find the mysql.gtid_slave_posXXX table associated with a given hton. */
+LEX_CSTRING *
+rpl_slave_state::select_gtid_pos_table(void *hton)
+{
+ struct gtid_pos_table *table_entry;
- if ((err= table->file->ha_index_init(0, 0)))
+ /*
+ See comments on rpl_slave_state::gtid_pos_tables for rules around proper
+ access to the list.
+ */
+ table_entry= (struct gtid_pos_table *)
+ my_atomic_loadptr_explicit(&gtid_pos_tables, MY_MEMORY_ORDER_ACQUIRE);
+
+ while (table_entry)
{
- table->file->print_error(err, MYF(0));
- goto end;
+ if (table_entry->table_hton == hton)
+ {
+ if (likely(table_entry->state == GTID_POS_AVAILABLE))
+ return &table_entry->table_name;
+ }
+ table_entry= table_entry->next;
}
- cur = delete_list;
- while (cur)
- {
- uchar key_buffer[4+8];
- DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete",
- { err= ENOENT;
- table->file->print_error(err, MYF(0));
- /* `break' does not work inside DBUG_EXECUTE_IF */
- goto dbug_break; });
+ table_entry= (struct gtid_pos_table *)
+ my_atomic_loadptr_explicit(&default_gtid_pos_table, MY_MEMORY_ORDER_ACQUIRE);
+ return &table_entry->table_name;
+}
- next= cur->next;
- table->field[1]->store(cur->sub_id, true);
- /* domain_id is already set in table->record[0] from write_row() above. */
- key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
- if (table->file->ha_index_read_map(table->record[1], key_buffer,
- HA_WHOLE_KEY, HA_READ_KEY_EXACT))
- /* We cannot find the row, assume it is already deleted. */
- ;
- else if ((err= table->file->ha_delete_row(table->record[1])))
- table->file->print_error(err, MYF(0));
- /*
- In case of error, we still discard the element from the list. We do
- not want to endlessly error on the same element in case of table
- corruption or such.
- */
- cur= next;
- if (err)
- break;
- }
-IF_DBUG(dbug_break:, )
- table->file->ha_index_end();
+void
+rpl_slave_state::gtid_delete_pending(THD *thd,
+ rpl_slave_state::list_element **list_ptr)
+{
+ int err= 0;
+ ulonglong thd_saved_option;
-end:
+ if (unlikely(!loaded))
+ return;
#ifdef WITH_WSREP
- thd->wsrep_ignore_table= false;
+ /*
+ Updates in slave state table should not be appended to galera transaction
+ writeset.
+ */
+ thd->wsrep_ignore_table= true;
#endif
- if (table_opened)
+ thd_saved_option= thd->variables.option_bits;
+ thd->variables.option_bits&=
+ ~(ulonglong)(OPTION_NOT_AUTOCOMMIT |OPTION_BEGIN |OPTION_BIN_LOG |
+ OPTION_GTID_BEGIN);
+
+ while (*list_ptr)
{
- if (err || (err= ha_commit_trans(thd, FALSE)))
- {
- /*
- If error, we need to put any remaining delete_list back into the HASH
- so we can do another delete attempt later.
- */
- if (delete_list)
- {
- put_back_list(gtid->domain_id, delete_list);
- delete_list = 0;
- }
+ LEX_CSTRING *gtid_pos_table_name, *tmp_table_name;
+ Query_tables_list lex_backup;
+ TABLE_LIST tlist;
+ TABLE *table;
+ handler::Table_flags direct_pos;
+ list_element *cur, **cur_ptr_ptr;
+ bool table_opened= false;
+ void *hton= (*list_ptr)->hton;
- ha_rollback_trans(thd, FALSE);
+ thd->reset_for_next_command();
+
+ /*
+ Only the SQL thread can call select_gtid_pos_table without a mutex
+ Other threads needs to use a mutex and take into account that the
+ result may change during execution, so we have to make a copy.
+ */
+ mysql_mutex_lock(&LOCK_slave_state);
+ tmp_table_name= select_gtid_pos_table(hton);
+ gtid_pos_table_name= thd->make_clex_string(tmp_table_name->str,
+ tmp_table_name->length);
+ mysql_mutex_unlock(&LOCK_slave_state);
+ if (!gtid_pos_table_name)
+ {
+ /* Out of memory - we can try again later. */
+ break;
}
- close_thread_tables(thd);
- if (rgi)
+
+ thd->lex->reset_n_backup_query_tables_list(&lex_backup);
+ tlist.init_one_table(&MYSQL_SCHEMA_NAME, gtid_pos_table_name, NULL, TL_WRITE);
+ if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
+ goto end;
+ table_opened= true;
+ table= tlist.table;
+
+ if ((err= gtid_check_rpl_slave_state_table(table)))
+ goto end;
+
+ direct_pos= table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION;
+ bitmap_set_all(table->write_set);
+ table->rpl_write_set= table->write_set;
+
+ /* Now delete any already committed GTIDs. */
+ bitmap_set_bit(table->read_set, table->field[0]->field_index);
+ bitmap_set_bit(table->read_set, table->field[1]->field_index);
+
+ if (!direct_pos && (err= table->file->ha_index_init(0, 0)))
{
- thd->mdl_context.release_statement_locks();
- /*
- Save the list of old gtid entries we deleted. If this transaction
- fails later for some reason and is rolled back, the deletion of those
- entries will be rolled back as well, and we will need to put them back
- on the to-be-deleted list so we can re-do the deletion. Otherwise
- redundant rows in mysql.gtid_slave_pos may accumulate if transactions
- are rolled back and retried after record_gtid().
- */
-#ifdef HAVE_REPLICATION
- rgi->pending_gtid_deletes_save(gtid->domain_id, delete_list);
-#endif
+ table->file->print_error(err, MYF(0));
+ goto end;
}
- else
+
+ cur = *list_ptr;
+ cur_ptr_ptr = list_ptr;
+ do
{
- thd->mdl_context.release_transactional_locks();
-#ifdef HAVE_REPLICATION
- rpl_group_info::pending_gtid_deletes_free(delete_list);
-#endif
+ uchar key_buffer[4+8];
+ list_element *next= cur->next;
+
+ if (cur->hton == hton)
+ {
+ int res;
+
+ table->field[0]->store((ulonglong)cur->domain_id, true);
+ table->field[1]->store(cur->sub_id, true);
+ if (direct_pos)
+ {
+ res= table->file->ha_rnd_pos_by_record(table->record[0]);
+ }
+ else
+ {
+ key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false);
+ res= table->file->ha_index_read_map(table->record[0], key_buffer,
+ HA_WHOLE_KEY, HA_READ_KEY_EXACT);
+ }
+ DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete",
+ { res= 1;
+ err= ENOENT;
+ sql_print_error("<DEBUG> Error deleting old GTID row");
+ });
+ if (res)
+ /* We cannot find the row, assume it is already deleted. */
+ ;
+ else if ((err= table->file->ha_delete_row(table->record[0])))
+ {
+ sql_print_error("Error deleting old GTID row: %s",
+ thd->get_stmt_da()->message());
+ /*
+ In case of error, we still discard the element from the list. We do
+ not want to endlessly error on the same element in case of table
+ corruption or such.
+ */
+ }
+ *cur_ptr_ptr= next;
+ my_free(cur);
+ }
+ else
+ {
+ /* Leave this one in the list until we get to the table for its hton. */
+ cur_ptr_ptr= &cur->next;
+ }
+ cur= next;
+ if (err)
+ break;
+ } while (cur);
+end:
+ if (table_opened)
+ {
+ if (!direct_pos)
+ table->file->ha_index_end();
+
+ if (err || (err= ha_commit_trans(thd, FALSE)))
+ ha_rollback_trans(thd, FALSE);
}
+ close_thread_tables(thd);
+ thd->mdl_context.release_transactional_locks();
+ thd->lex->restore_backup_query_tables_list(&lex_backup);
+
+ if (err)
+ break;
}
- thd->lex->restore_backup_query_tables_list(&lex_backup);
thd->variables.option_bits= thd_saved_option;
- thd->resume_subsequent_commits(suspended_wfc);
- DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep",
- {
- if (gtid->server_id == 100)
- my_sleep(500000);
- });
- DBUG_RETURN(err);
+
+#ifdef WITH_WSREP
+ thd->wsrep_ignore_table= false;
+#endif
}
@@ -1251,7 +1368,7 @@ rpl_slave_state::load(THD *thd, const char *state_from_master, size_t len,
if (gtid_parser_helper(&state_from_master, end, &gtid) ||
!(sub_id= next_sub_id(gtid.domain_id)) ||
- record_gtid(thd, &gtid, sub_id, NULL, in_statement, &hton) ||
+ record_gtid(thd, &gtid, sub_id, false, in_statement, &hton) ||
update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL))
return 1;
if (state_from_master == end)