diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2018-10-14 20:41:49 +0200 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2018-12-07 07:10:40 +0100 |
commit | 34f11b06e6aa5e8d4e648c04b5a4049179b66cfd (patch) | |
tree | 6731ebc08eb861b1d80eaf45ffbe7734b9afa104 /sql/rpl_gtid.cc | |
parent | 24a45d3bd735f0ab09c1b6d313015022e08c8b39 (diff) | |
download | mariadb-git-34f11b06e6aa5e8d4e648c04b5a4049179b66cfd.tar.gz |
Move deletion of old GTID rows to slave background thread
This patch changes how old rows in mysql.gtid_slave_pos* tables are deleted.
Instead of doing it as part of every replicated transaction in
record_gtid(), it is done periodically (every @@gtid_cleanup_batch_size
transaction) in the slave background thread.
This removes the deletion step from the replication process in SQL or worker
threads, which could speed up replication with many small transactions. It
also decreases contention on the global mutex LOCK_slave_state. And it
simplifies the logic, eg. when a replicated transaction fails after having
deleted old rows.
With this patch, the deletion of old GTID rows happens asynchroneously and
slightly non-deterministic. Thus the number of old rows in
mysql.gtid_slave_pos can temporarily exceed @@gtid_cleanup_batch_size. But
all old rows will be deleted eventually after sufficiently many new GTIDs
have been replicated.
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r-- | sql/rpl_gtid.cc | 413 |
1 files changed, 265 insertions, 148 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index 322b84130f2..17f474c2acf 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -79,7 +79,7 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi) rgi->gtid_pending= false; if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE) { - if (record_gtid(thd, &rgi->current_gtid, sub_id, NULL, false, &hton)) + if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false, &hton)) DBUG_RETURN(1); update_state_hash(sub_id, &rgi->current_gtid, hton, rgi); } @@ -244,7 +244,7 @@ rpl_slave_state_free_element(void *arg) rpl_slave_state::rpl_slave_state() - : last_sub_id(0), gtid_pos_tables(0), loaded(false) + : pending_gtid_count(0), last_sub_id(0), gtid_pos_tables(0), loaded(false) { mysql_mutex_init(key_LOCK_slave_state, &LOCK_slave_state, MY_MUTEX_INIT_SLOW); @@ -331,14 +331,11 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, } } rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; - -#ifdef HAVE_REPLICATION - rgi->pending_gtid_deletes_clear(); -#endif } if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME)))) return 1; + list_elem->domain_id= domain_id; list_elem->server_id= server_id; list_elem->sub_id= sub_id; list_elem->seq_no= seq_no; @@ -348,6 +345,15 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, if (last_sub_id < sub_id) last_sub_id= sub_id; +#ifdef HAVE_REPLICATION + ++pending_gtid_count; + if (pending_gtid_count >= opt_gtid_cleanup_batch_size) + { + pending_gtid_count = 0; + slave_background_gtid_pending_delete_request(); + } +#endif + return 0; } @@ -382,20 +388,22 @@ rpl_slave_state::get_element(uint32 domain_id) int -rpl_slave_state::put_back_list(uint32 domain_id, list_element *list) +rpl_slave_state::put_back_list(list_element *list) { - element *e; + element *e= NULL; int err= 0; mysql_mutex_lock(&LOCK_slave_state); - if (!(e= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0))) - { - err= 1; - goto end; - } while (list) { list_element *next= list->next; + + if ((!e || e->domain_id != list->domain_id) && + !(e= (element *)my_hash_search(&hash, (const uchar *)&list->domain_id, 0))) + { + err= 1; + goto end; + } e->add(list); list= next; } @@ -572,12 +580,12 @@ rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename) /* Write a gtid to the replication slave state table. + Do it as part of the transaction, to get slave crash safety, or as a separate + transaction if !in_transaction (eg. MyISAM or DDL). + gtid The global transaction id for this event group. sub_id Value allocated within the sub_id when the event group was read (sub_id must be consistent with commit order in master binlog). - rgi rpl_group_info context, if we are recording the gtid transactionally - as part of replicating a transactional event. NULL if called from - outside of a replicated transaction. Note that caller must later ensure that the new gtid and sub_id is inserted into the appropriate HASH element with rpl_slave_state.add(), so that it can @@ -585,16 +593,13 @@ rpl_slave_state::select_gtid_pos_table(THD *thd, LEX_CSTRING *out_tablename) */ int rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, - rpl_group_info *rgi, bool in_statement, + bool in_transaction, bool in_statement, void **out_hton) { TABLE_LIST tlist; int err= 0, not_sql_thread; bool table_opened= false; TABLE *table; - list_element *delete_list= 0, *next, *cur, **next_ptr_ptr, **best_ptr_ptr; - uint64 best_sub_id; - element *elem; ulonglong thd_saved_option= thd->variables.option_bits; Query_tables_list lex_backup; wait_for_commit* suspended_wfc; @@ -684,7 +689,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, thd->wsrep_ignore_table= true; #endif - if (!rgi) + if (!in_transaction) { DBUG_PRINT("info", ("resetting OPTION_BEGIN")); thd->variables.option_bits&= @@ -716,168 +721,280 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, my_error(ER_OUT_OF_RESOURCES, MYF(0)); goto end; } +end: - mysql_mutex_lock(&LOCK_slave_state); - if ((elem= get_element(gtid->domain_id)) == NULL) +#ifdef WITH_WSREP + thd->wsrep_ignore_table= false; +#endif + + if (table_opened) { - mysql_mutex_unlock(&LOCK_slave_state); - my_error(ER_OUT_OF_RESOURCES, MYF(0)); - err= 1; - goto end; + if (err || (err= ha_commit_trans(thd, FALSE))) + ha_rollback_trans(thd, FALSE); + + close_thread_tables(thd); + if (in_transaction) + thd->mdl_context.release_statement_locks(); + else + thd->mdl_context.release_transactional_locks(); } + thd->lex->restore_backup_query_tables_list(&lex_backup); + thd->variables.option_bits= thd_saved_option; + thd->resume_subsequent_commits(suspended_wfc); + DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep", + { + if (gtid->server_id == 100) + my_sleep(500000); + }); + DBUG_RETURN(err); +} - /* Now pull out all GTIDs that were recorded in this engine. */ - delete_list = NULL; - next_ptr_ptr= &elem->list; - cur= elem->list; - best_sub_id= 0; - best_ptr_ptr= NULL; - while (cur) + +/* + Return a list of all old GTIDs in any mysql.gtid_slave_pos* table that are + no longer needed and can be deleted from the table. + + Within each domain, we need to keep around the latest GTID (the one with the + highest sub_id), but any others in that domain can be deleted. +*/ +rpl_slave_state::list_element * +rpl_slave_state::gtid_grab_pending_delete_list() +{ + uint32 i; + list_element *full_list; + + mysql_mutex_lock(&LOCK_slave_state); + full_list= NULL; + for (i= 0; i < hash.records; ++i) { - list_element *next= cur->next; - if (cur->hton == hton) - { - /* Belongs to same engine, so move it to the delete list. */ - cur->next= delete_list; - delete_list= cur; - if (cur->sub_id > best_sub_id) + element *elem= (element *)my_hash_element(&hash, i); + list_element *elist= elem->list; + list_element *last_elem, **best_ptr_ptr, *cur, *next; + uint64 best_sub_id; + + if (!elist) + continue; /* Nothing here */ + + /* Delete any old stuff, but keep around the most recent one. */ + cur= elist; + best_sub_id= cur->sub_id; + best_ptr_ptr= &elist; + last_elem= cur; + while ((next= cur->next)) { + last_elem= next; + if (next->sub_id > best_sub_id) { - best_sub_id= cur->sub_id; - best_ptr_ptr= &delete_list; - } - else if (best_ptr_ptr == &delete_list) + best_sub_id= next->sub_id; best_ptr_ptr= &cur->next; - } - else - { - /* Another engine, leave it in the list. */ - if (cur->sub_id > best_sub_id) - { - best_sub_id= cur->sub_id; - /* Current best is not on the delete list. */ - best_ptr_ptr= NULL; } - *next_ptr_ptr= cur; - next_ptr_ptr= &cur->next; + cur= next; } - cur= next; - } - *next_ptr_ptr= NULL; - /* - If the highest sub_id element is on the delete list, put it back on the - original list, to preserve the highest sub_id element in the table for - GTID position recovery. - */ - if (best_ptr_ptr) - { + /* + Append the new elements to the full list. Note the order is important; + we do it here so that we do not break the list if best_sub_id is the + last of the new elements. + */ + last_elem->next= full_list; + /* + Delete the highest sub_id element from the old list, and put it back as + the single-element new list. + */ cur= *best_ptr_ptr; *best_ptr_ptr= cur->next; - cur->next= elem->list; + cur->next= NULL; elem->list= cur; + + /* + Collect the full list so far here. Note that elist may have moved if we + deleted the first element, so order is again important. + */ + full_list= elist; } mysql_mutex_unlock(&LOCK_slave_state); - if (!delete_list) - goto end; + return full_list; +} + - /* Now delete any already committed GTIDs. */ - bitmap_set_bit(table->read_set, table->field[0]->field_index); - bitmap_set_bit(table->read_set, table->field[1]->field_index); +/* Find the mysql.gtid_slave_posXXX table associated with a given hton. */ +LEX_CSTRING * +rpl_slave_state::select_gtid_pos_table(void *hton) +{ + struct gtid_pos_table *table_entry; - if ((err= table->file->ha_index_init(0, 0))) + /* + See comments on rpl_slave_state::gtid_pos_tables for rules around proper + access to the list. + */ + table_entry= (struct gtid_pos_table *) + my_atomic_loadptr_explicit(>id_pos_tables, MY_MEMORY_ORDER_ACQUIRE); + + while (table_entry) { - table->file->print_error(err, MYF(0)); - goto end; + if (table_entry->table_hton == hton) + { + if (likely(table_entry->state == GTID_POS_AVAILABLE)) + return &table_entry->table_name; + } + table_entry= table_entry->next; } - cur = delete_list; - while (cur) - { - uchar key_buffer[4+8]; - DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete", - { err= ENOENT; - table->file->print_error(err, MYF(0)); - /* `break' does not work inside DBUG_EXECUTE_IF */ - goto dbug_break; }); + table_entry= (struct gtid_pos_table *) + my_atomic_loadptr_explicit(&default_gtid_pos_table, MY_MEMORY_ORDER_ACQUIRE); + return &table_entry->table_name; +} - next= cur->next; - table->field[1]->store(cur->sub_id, true); - /* domain_id is already set in table->record[0] from write_row() above. */ - key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false); - if (table->file->ha_index_read_map(table->record[1], key_buffer, - HA_WHOLE_KEY, HA_READ_KEY_EXACT)) - /* We cannot find the row, assume it is already deleted. */ - ; - else if ((err= table->file->ha_delete_row(table->record[1]))) - table->file->print_error(err, MYF(0)); - /* - In case of error, we still discard the element from the list. We do - not want to endlessly error on the same element in case of table - corruption or such. - */ - cur= next; - if (err) - break; - } -IF_DBUG(dbug_break:, ) - table->file->ha_index_end(); +void +rpl_slave_state::gtid_delete_pending(THD *thd, + rpl_slave_state::list_element **list_ptr) +{ + int err= 0; + ulonglong thd_saved_option; -end: + if (unlikely(!loaded)) + return; #ifdef WITH_WSREP - thd->wsrep_ignore_table= false; + /* + Updates in slave state table should not be appended to galera transaction + writeset. + */ + thd->wsrep_ignore_table= true; #endif - if (table_opened) + thd_saved_option= thd->variables.option_bits; + thd->variables.option_bits&= + ~(ulonglong)(OPTION_NOT_AUTOCOMMIT |OPTION_BEGIN |OPTION_BIN_LOG | + OPTION_GTID_BEGIN); + + while (*list_ptr) { - if (err || (err= ha_commit_trans(thd, FALSE))) - { - /* - If error, we need to put any remaining delete_list back into the HASH - so we can do another delete attempt later. - */ - if (delete_list) - { - put_back_list(gtid->domain_id, delete_list); - delete_list = 0; - } + LEX_CSTRING *gtid_pos_table_name, *tmp_table_name; + Query_tables_list lex_backup; + TABLE_LIST tlist; + TABLE *table; + handler::Table_flags direct_pos; + list_element *cur, **cur_ptr_ptr; + bool table_opened= false; + void *hton= (*list_ptr)->hton; - ha_rollback_trans(thd, FALSE); + thd->reset_for_next_command(); + + /* + Only the SQL thread can call select_gtid_pos_table without a mutex + Other threads needs to use a mutex and take into account that the + result may change during execution, so we have to make a copy. + */ + mysql_mutex_lock(&LOCK_slave_state); + tmp_table_name= select_gtid_pos_table(hton); + gtid_pos_table_name= thd->make_clex_string(tmp_table_name->str, + tmp_table_name->length); + mysql_mutex_unlock(&LOCK_slave_state); + if (!gtid_pos_table_name) + { + /* Out of memory - we can try again later. */ + break; } - close_thread_tables(thd); - if (rgi) + + thd->lex->reset_n_backup_query_tables_list(&lex_backup); + tlist.init_one_table(&MYSQL_SCHEMA_NAME, gtid_pos_table_name, NULL, TL_WRITE); + if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) + goto end; + table_opened= true; + table= tlist.table; + + if ((err= gtid_check_rpl_slave_state_table(table))) + goto end; + + direct_pos= table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION; + bitmap_set_all(table->write_set); + table->rpl_write_set= table->write_set; + + /* Now delete any already committed GTIDs. */ + bitmap_set_bit(table->read_set, table->field[0]->field_index); + bitmap_set_bit(table->read_set, table->field[1]->field_index); + + if (!direct_pos && (err= table->file->ha_index_init(0, 0))) { - thd->mdl_context.release_statement_locks(); - /* - Save the list of old gtid entries we deleted. If this transaction - fails later for some reason and is rolled back, the deletion of those - entries will be rolled back as well, and we will need to put them back - on the to-be-deleted list so we can re-do the deletion. Otherwise - redundant rows in mysql.gtid_slave_pos may accumulate if transactions - are rolled back and retried after record_gtid(). - */ -#ifdef HAVE_REPLICATION - rgi->pending_gtid_deletes_save(gtid->domain_id, delete_list); -#endif + table->file->print_error(err, MYF(0)); + goto end; } - else + + cur = *list_ptr; + cur_ptr_ptr = list_ptr; + do { - thd->mdl_context.release_transactional_locks(); -#ifdef HAVE_REPLICATION - rpl_group_info::pending_gtid_deletes_free(delete_list); -#endif + uchar key_buffer[4+8]; + list_element *next= cur->next; + + if (cur->hton == hton) + { + int res; + + table->field[0]->store((ulonglong)cur->domain_id, true); + table->field[1]->store(cur->sub_id, true); + if (direct_pos) + { + res= table->file->ha_rnd_pos_by_record(table->record[0]); + } + else + { + key_copy(key_buffer, table->record[0], &table->key_info[0], 0, false); + res= table->file->ha_index_read_map(table->record[0], key_buffer, + HA_WHOLE_KEY, HA_READ_KEY_EXACT); + } + DBUG_EXECUTE_IF("gtid_slave_pos_simulate_failed_delete", + { res= 1; + err= ENOENT; + sql_print_error("<DEBUG> Error deleting old GTID row"); + }); + if (res) + /* We cannot find the row, assume it is already deleted. */ + ; + else if ((err= table->file->ha_delete_row(table->record[0]))) + { + sql_print_error("Error deleting old GTID row: %s", + thd->get_stmt_da()->message()); + /* + In case of error, we still discard the element from the list. We do + not want to endlessly error on the same element in case of table + corruption or such. + */ + } + *cur_ptr_ptr= next; + my_free(cur); + } + else + { + /* Leave this one in the list until we get to the table for its hton. */ + cur_ptr_ptr= &cur->next; + } + cur= next; + if (err) + break; + } while (cur); +end: + if (table_opened) + { + if (!direct_pos) + table->file->ha_index_end(); + + if (err || (err= ha_commit_trans(thd, FALSE))) + ha_rollback_trans(thd, FALSE); } + close_thread_tables(thd); + thd->mdl_context.release_transactional_locks(); + thd->lex->restore_backup_query_tables_list(&lex_backup); + + if (err) + break; } - thd->lex->restore_backup_query_tables_list(&lex_backup); thd->variables.option_bits= thd_saved_option; - thd->resume_subsequent_commits(suspended_wfc); - DBUG_EXECUTE_IF("inject_record_gtid_serverid_100_sleep", - { - if (gtid->server_id == 100) - my_sleep(500000); - }); - DBUG_RETURN(err); + +#ifdef WITH_WSREP + thd->wsrep_ignore_table= false; +#endif } @@ -1251,7 +1368,7 @@ rpl_slave_state::load(THD *thd, const char *state_from_master, size_t len, if (gtid_parser_helper(&state_from_master, end, >id) || !(sub_id= next_sub_id(gtid.domain_id)) || - record_gtid(thd, >id, sub_id, NULL, in_statement, &hton) || + record_gtid(thd, >id, sub_id, false, in_statement, &hton) || update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, hton, NULL)) return 1; if (state_from_master == end) |