diff options
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r-- | sql/rpl_gtid.cc | 413 |
1 files changed, 265 insertions, 148 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index 322b84130f2..17f474c2acf 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -79,7 +79,7 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi) rgi->gtid_pending= false; if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE) { - if (record_gtid(thd, &rgi->current_gtid, sub_id, NULL, false, &hton)) + if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false, &hton)) DBUG_RETURN(1); update_state_hash(sub_id, &rgi->current_gtid, hton, rgi); } @@ -244,7 +244,7 @@ rpl_slave_state_free_element(void *arg) rpl_slave_state::rpl_slave_state() - : last_sub_id(0), gtid_pos_tables(0), loaded(false) + : pending_gtid_count(0), last_sub_id(0), gtid_pos_tables(0), loaded(false) { mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state, MY_MUTEX_INIT_SLOW); @@ -331,14 +331,11 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, } } rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; - -#ifdef HAVE_REPLICATION - rgi->pending_gtid_deletes_clear(); -#endif } if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME)))) return 1; + list_elem->domain_id= domain_id; list_elem->server_id= server_id; list_elem->sub_id= sub_id; list_elem->seq_no= seq_no; @@ -348,6 +345,15 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, if (last_sub_id < sub_id) last_sub_id= sub_id; +#ifdef HAVE_REPLICATION + ++pending_gtid_count; + if (pending_gtid_count >= opt_gtid_cleanup_batch_size) + { + pending_gtid_count = 0; + slave_background_gtid_pending_delete_request(); + } +#endif + return 0; } @@ -382,20 +388,22 @@ rpl_slave_state::get_element(uint32 domain_id) int -rpl_slave_state::put_back_list(uint32 domain_id, list_element *list) +rpl_slave_state::put_back_list(list_element *list) { - element *e; + element *e= NULL; int err= 0; mysql_mutex_lock(&LOCK_slave_state); - if (!(e= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0))) - { - err= 1; - goto end; - } while (list) { list_element *next= list->next; + + if ((!e || e->domain_id != list->domain_id) && + !(e= (element *)my_hash_search(&hash, (const uchar *)&list->domain_id, 0))) + { + err= 1; + goto end; + } e->add(list); list= next; } @@ -572,12 +580,12 @@ rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename) /* Write a gtid to the replication slave state table. + Do it as part of the transaction, to get slave crash safety, or as a separate + transaction if !in_transaction (eg. MyISAM or DDL). + gtid The global transaction id for this event group. sub_id Value allocated within the sub_id when the event group was read (sub_id must be consistent with commit order in master binlog). - rgi rpl_group_info context, if we are recording the gtid transactionally - as part of replicating a transactional event. NULL if called from - outside of a replicated transaction. Note that caller must later ensure that the new gtid and sub_id is inserted into the appropriate HASH element with rpl_slave_state.add(), so that it can @@ -585,16 +593,13 @@ rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename) */ int rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, - rpl_group_info *rgi, bool in_statement, + bool in_transaction, bool in_statement, void **out_hton) { TABLE_LIST tlist; int err= 0, not_sql_thread; bool table_opened= false; TABLE *table; - list_element *delete_list= 0, *next, *cur, **next_ptr_ptr, **best_ptr_ptr; - uint64 best_sub_id; - element *elem; ulonglong thd_saved_option= thd->variables.option_bits; Query_tables_list lex_backup; wait_for_commit* suspended_wfc; @@ -684,7 +689,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, thd->wsrep_ignore_table= true; #endif - if (!rgi) + if (!in_transaction) { DBUG_PRINT("info", ("resetting OPTION_BEGIN")); thd->variables.option_bits&= @@ -716,168 +721,280 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, my_error(ER_OUT_OF_RESOURCES, MYF(0)); goto end; } +end: - mysql_mutex_lock(&LOCK_slave_state); - if ((elem= get_element(gtid->domain_id)) == NULL) +#ifdef WITH_WSREP + thd->wsrep_ignore_table= false; +#endif + + if (table_opened) { - mysql_mutex_unlock(&LOCK_slave_state); - my_error(ER_OUT_OF_RESOURCES, MYF(0)); - err= 1; - goto end; + if (err || (err= ha_commit_trans(thd, FALSE))) + ha_rollback_trans(thd, FALSE); + + close_thread_tables(thd); + if (in_transaction) + thd->mdl_context.release_statement_locks(); + else + thd->mdl_context.release_transactional_locks(); } + thd->lex->restore_backup_query_tables_list(&lex_backup); + thd->variables.option_bits= thd_saved_option; + thd->resume_subsequent_commits(suspended_wfc); + DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep", + { + if (gtid->server_id == 100) + my_sleep(500000); + }); + DBUG_RETURN(err); +} - /* Now pull out all GTIDs that were recorded in this engine. */ - delete_list = NULL; - next_ptr_ptr= &elem->list; - cur= elem->list; - best_sub_id= 0; - best_ptr_ptr= NULL; - while (cur) + +/* + Return a list of all old GTIDs in any mysql.gtid_slave_pos* table that are + no longer needed and can be deleted from the table. + + Within each domain, we need to keep around the latest GTID (the one with the + highest sub_id), but any others in that domain can be deleted. +*/ +rpl_slave_state::list_element * +rpl_slave_state::gtid_grab_pending_delete_list() +{ + uint32 i; + list_element *full_list; + + mysql_mutex_lock(&LOCK_slave_state); + full_list= NULL; + for (i= 0; i < hash.records; ++i) { - list_element *next= cur->next; - if (cur->hton == hton) - { - /* Belongs to same engine, so move it to the delete list. */ - cur->next= delete_list; - delete_list= cur; - if (cur->sub_id > best_sub_id) + element *elem= (element *)my_hash_element(&hash, i); + list_element *elist= elem->list; + list_element *last_elem, **best_ptr_ptr, *cur, *next; + uint64 best_sub_id; + + if (!elist) + continue; /* Nothing here */ + + /* Delete any old stuff, but keep around the most recent one. */ + cur= elist; + best_sub_id= cur->sub_id; + best_ptr_ptr= &elist; + last_elem= cur; + while ((next= cur->next)) { + last_elem= next; + if (next->sub_id > best_sub_id) { - best_sub_id= cur->sub_id; - best_ptr_ptr= &delete_list; - } - else if (best_ptr_ptr == &delete_list) + best_sub_id= next->sub_id; best_ptr_ptr= &cur->next; - } - else - { - /* Another engine, leave it in the list. */ - if (cur->sub_id > best_sub_id) - { - best_sub_id= cur->sub_id; - /* Current best is not on the delete list. */ - best_ptr_ptr= NULL; } - *next_ptr_ptr= cur; - next_ptr_ptr= &cur->next; + cur= next; } - cur= next; - } - *next_ptr_ptr= NULL; - /* - If the highest sub_id element is on the delete list, put it back on the - original list, to preserve the highest sub_id element in the table for - GTID position recovery. - */ - if (best_ptr_ptr) - { + /* + Append the new elements to the full list. Note the order is important; + we do it here so that we do not break the list if best_sub_id is the + last of the new elements. + */ + last_elem->next= full_list; + /* + Delete the highest sub_id element from the old list, and put it back as + the single-element new list. + */ cur= *best_ptr_ptr; *best_ptr_ptr= cur->next; - cur->next= elem->list; + cur->next= NULL; elem->list= cur; + + /* + Collect the full list so far here. Note that elist may have moved if we + deleted the first element, so order is again important. + */ + full_list= elist; } mysql_mutex_unlock(&LOCK_slave_state); - if (!delete_list) - goto end; + return full_list; +} + - /* Now delete any already committed GTIDs. */ - bitmap_set_bit(table->read_set, table->field[0]->field_index); - bitmap_set_bit(table->read_set, table->field[1]->field_index); +/* Find the mysql.gtid_slave_posXXX table associated with a given hton. */ +LEX_CSTRING * +rpl_slave_state::select_gtid_pos_table(void *hton) +{ + struct gtid_pos_table *table_entry; - if ((err= table->file->ha_index_init(0, 0))) + /* + See comments on rpl_slave_state::gtid_pos_tables for rules around proper + access to the list. + */ + table_entry= (struct gtid_pos_table *) + my_atomic_loadptr_explicit(>id_pos_tables, MY_MEMORY_ORDER_ACQUIRE); + + while (table_entry) { - table->file->print_error(err, MYF(0)); - goto end; + if (table_entry->table_hton == hton) + { + if (likely(table_entry->state == GTID_POS_AVAILABLE)) + return &table_entry->table_name; + } + table_entry= table_entry->next; } - cur = delete_list; - while (cur) - { - uchar key_buffer[4+8]; - DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete", - { err= ENOENT; - table->file->print_error(err, MYF(0)); - /* `break' does not work inside DBUG_EXECUTE_IF */ - goto dbug_break; }); + table_entry= (struct gtid_pos_table *) + my_atomic_loadptr_explicit(&default_gtid_pos_table, MY_MEMORY_ORDER_ACQUIRE); + return &table_entry->table_name; +} - next= cur->next; - table->field[1]->store(cur->sub_id, true); - /* domain_id is already set in table->record[0] from write_row() above. */ - key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false); - if (table->file->ha_index_read_map(table->record[1], key_buffer, - HA_WHOLE_KEY, HA_READ_KEY_EXACT)) - /* We cannot find the row, assume it is already deleted. */ - ; - else if ((err= table->file->ha_delete_row(table->record[1]))) - table->file->print_error(err, MYF(0)); - /* - In case of error, we still discard the element from the list. We do - not want to endlessly error on the same element in case of table - corruption or such. - */ - cur= next; - if (err) - break; - } -IF_DBUG(dbug_break:, ) - table->file->ha_index_end(); +void +rpl_slave_state::gtid_delete_pending(THD *thd, + rpl_slave_state::list_element **list_ptr) +{ + int err= 0; + ulonglong thd_saved_option; -end: + if (unlikely(!loaded)) + return; #ifdef WITH_WSREP - thd->wsrep_ignore_table= false; + /* + Updates in slave state table should not be appended to galera transaction + writeset. + */ + thd->wsrep_ignore_table= true; #endif - if (table_opened) + thd_saved_option= thd->variables.option_bits; + thd->variables.option_bits&= + ~(ulonglong)(OPTION_NOT_AUTOCOMMIT |OPTION_BEGIN |OPTION_BIN_LOG | + OPTION_GTID_BEGIN); + + while (*list_ptr) { - if (err || (err= ha_commit_trans(thd, FALSE))) - { - /* - If error, we need to put any remaining delete_list back into the HASH - so we can do another delete attempt later. - */ - if (delete_list) - { - put_back_list(gtid->domain_id, delete_list); - delete_list = 0; - } + LEX_CSTRING *gtid_pos_table_name, *tmp_table_name; + Query_tables_list lex_backup; + TABLE_LIST tlist; + TABLE *table; + handler::Table_flags direct_pos; + list_element *cur, **cur_ptr_ptr; + bool table_opened= false; + void *hton= (*list_ptr)->hton; - ha_rollback_trans(thd, FALSE); + thd->reset_for_next_command(); + + /* + Only the SQL thread can call select_gtid_pos_table without a mutex + Other threads needs to use a mutex and take into account that the + result may change during execution, so we have to make a copy. + */ + mysql_mutex_lock(&LOCK_slave_state); + tmp_table_name= select_gtid_pos_table(hton); + gtid_pos_table_name= thd->make_clex_string(tmp_table_name->str, + tmp_table_name->length); + mysql_mutex_unlock(&LOCK_slave_state); + if (!gtid_pos_table_name) + { + /* Out of memory - we can try again later. */ + break; } - close_thread_tables(thd); - if (rgi) + + thd->lex->reset_n_backup_query_tables_list(&lex_backup); + tlist.init_one_table(&MYSQL_SCHEMA_NAME, gtid_pos_table_name, NULL, TL_WRITE); + if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) + goto end; + table_opened= true; + table= tlist.table; + + if ((err= gtid_check_rpl_slave_state_table(table))) + goto end; + + direct_pos= table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION; + bitmap_set_all(table->write_set); + table->rpl_write_set= table->write_set; + + /* Now delete any already committed GTIDs. */ + bitmap_set_bit(table->read_set, table->field[0]->field_index); + bitmap_set_bit(table->read_set, table->field[1]->field_index); + + if (!direct_pos && (err= table->file->ha_index_init(0, 0))) { - thd->mdl_context.release_statement_locks(); - /* - Save the list of old gtid entries we deleted. If this transaction - fails later for some reason and is rolled back, the deletion of those - entries will be rolled back as well, and we will need to put them back - on the to-be-deleted list so we can re-do the deletion. Otherwise - redundant rows in mysql.gtid_slave_pos may accumulate if transactions - are rolled back and retried after record_gtid(). - */ -#ifdef HAVE_REPLICATION - rgi->pending_gtid_deletes_save(gtid->domain_id, delete_list); -#endif + table->file->print_error(err, MYF(0)); + goto end; } - else + + cur = *list_ptr; + cur_ptr_ptr = list_ptr; + do { - thd->mdl_context.release_transactional_locks(); -#ifdef HAVE_REPLICATION - rpl_group_info::pending_gtid_deletes_free(delete_list); -#endif + uchar key_buffer[4+8]; + list_element *next= cur->next; + + if (cur->hton == hton) + { + int res; + + table->field[0]->store((ulonglong)cur->domain_id, true); + table->field[1]->store(cur->sub_id, true); + if (direct_pos) + { + res= table->file->ha_rnd_pos_by_record(table->record[0]); + } + else + { + key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false); + res= table->file->ha_index_read_map(table->record[0], key_buffer, + HA_WHOLE_KEY, HA_READ_KEY_EXACT); + } + DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete", + { res= 1; + err= ENOENT; + sql_print_error("<DEBUG> Error deleting old GTID row"); + }); + if (res) + /* We cannot find the row, assume it is already deleted. */ + ; + else if ((err= table->file->ha_delete_row(table->record[0]))) + { + sql_print_error("Error deleting old GTID row: %s", + thd->get_stmt_da()->message()); + /* + In case of error, we still discard the element from the list. We do + not want to endlessly error on the same element in case of table + corruption or such. + */ + } + *cur_ptr_ptr= next; + my_free(cur); + } + else + { + /* Leave this one in the list until we get to the table for its hton. */ + cur_ptr_ptr= &cur->next; + } + cur= next; + if (err) + break; + } while (cur); +end: + if (table_opened) + { + if (!direct_pos) + table->file->ha_index_end(); + + if (err || (err= ha_commit_trans(thd, FALSE))) + ha_rollback_trans(thd, FALSE); } + close_thread_tables(thd); + thd->mdl_context.release_transactional_locks(); + thd->lex->restore_backup_query_tables_list(&lex_backup); + + if (err) + break; } - thd->lex->restore_backup_query_tables_list(&lex_backup); thd->variables.option_bits= thd_saved_option; - thd->resume_subsequent_commits(suspended_wfc); - DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep", - { - if (gtid->server_id == 100) - my_sleep(500000); - }); - DBUG_RETURN(err); + +#ifdef WITH_WSREP + thd->wsrep_ignore_table= false; +#endif } @@ -1251,7 +1368,7 @@ rpl_slave_state::load(THD *thd, const char *state_from_master, size_t len, if (gtid_parser_helper(&state_from_master, end, >id) || !(sub_id= next_sub_id(gtid.domain_id)) || - record_gtid(thd, >id, sub_id, NULL, in_statement, &hton) || + record_gtid(thd, >id, sub_id, false, in_statement, &hton) || update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL)) return 1; if (state_from_master == end) |