diff options
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r-- | sql/rpl_rli.cc | 485 |
1 files changed, 436 insertions, 49 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index ed9f0369f5d..efb256fbe11 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -14,7 +14,7 @@ along with this program; if not, write to the Free Software Foundation, 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */ -#include <my_global.h> +#include "mariadb.h" #include "sql_priv.h" #include "unireg.h" // HAVE_* #include "rpl_mi.h" @@ -31,6 +31,8 @@ #include "slave.h" #include <mysql/plugin.h> #include <mysql/service_thd_wait.h> +#include "lock.h" +#include "sql_table.h" static int count_relay_log_space(Relay_log_info* rli); @@ -1509,41 +1511,22 @@ Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -int -rpl_load_gtid_slave_state(THD *thd) +struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; void *hton; }; + +static int +scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array, + LEX_CSTRING *tablename, void **out_hton) { TABLE_LIST tlist; TABLE *table; bool table_opened= false; bool table_scanned= false; - bool array_inited= false; - struct local_element { uint64 sub_id; rpl_gtid gtid; }; - struct local_element tmp_entry, *entry; - HASH hash; - DYNAMIC_ARRAY array; + struct gtid_pos_element tmp_entry, *entry; int err= 0; - uint32 i; - DBUG_ENTER("rpl_load_gtid_slave_state"); - - mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); - bool loaded= rpl_global_gtid_slave_state->loaded; - mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); - if (loaded) - DBUG_RETURN(0); - - my_hash_init(&hash, &my_charset_bin, 32, - offsetof(local_element, gtid) + offsetof(rpl_gtid, domain_id), - sizeof(uint32), NULL, my_free, HASH_UNIQUE); - if ((err= my_init_dynamic_array(&array, sizeof(local_element), 0, 0, MYF(0)))) - goto end; - array_inited= true; thd->reset_for_next_command(); - - tlist.init_one_table(STRING_WITH_LEN("mysql"), - rpl_gtid_slave_state_table_name.str, - rpl_gtid_slave_state_table_name.length, - NULL, TL_READ); + tlist.init_one_table(STRING_WITH_LEN("mysql"), tablename->str, + tablename->length, NULL, TL_READ); if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) goto end; table_opened= true; @@ -1589,26 +1572,28 @@ rpl_load_gtid_slave_state(THD *thd) tmp_entry.gtid.domain_id= domain_id; tmp_entry.gtid.server_id= server_id; tmp_entry.gtid.seq_no= seq_no; - if ((err= insert_dynamic(&array, (uchar *)&tmp_entry))) + tmp_entry.hton= table->s->db_type(); + if ((err= insert_dynamic(array, (uchar *)&tmp_entry))) { my_error(ER_OUT_OF_RESOURCES, MYF(0)); goto end; } - if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0))) + if ((rec= my_hash_search(hash, (const uchar *)&domain_id, 0))) { - entry= (struct local_element *)rec; + entry= (struct gtid_pos_element *)rec; if (entry->sub_id >= sub_id) continue; entry->sub_id= sub_id; DBUG_ASSERT(entry->gtid.domain_id == domain_id); entry->gtid.server_id= server_id; entry->gtid.seq_no= seq_no; + entry->hton= table->s->db_type(); } else { - if (!(entry= (struct local_element *)my_malloc(sizeof(*entry), - MYF(MY_WME)))) + if (!(entry= (struct gtid_pos_element *)my_malloc(sizeof(*entry), + MYF(MY_WME)))) { my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry)); err= 1; @@ -1618,7 +1603,8 @@ rpl_load_gtid_slave_state(THD *thd) entry->gtid.domain_id= domain_id; entry->gtid.server_id= server_id; entry->gtid.seq_no= seq_no; - if ((err= my_hash_insert(&hash, (uchar *)entry))) + entry->hton= table->s->db_type(); + if ((err= my_hash_insert(hash, (uchar *)entry))) { my_free(entry); my_error(ER_OUT_OF_RESOURCES, MYF(0)); @@ -1626,6 +1612,249 @@ rpl_load_gtid_slave_state(THD *thd) } } } + err= 0; /* Clear HA_ERR_END_OF_FILE */ + +end: + if (table_scanned) + { + table->file->ha_index_or_rnd_end(); + ha_commit_trans(thd, FALSE); + ha_commit_trans(thd, TRUE); + } + if (table_opened) + { + *out_hton= table->s->db_type(); + close_thread_tables(thd); + thd->mdl_context.release_transactional_locks(); + } + return err; +} + + +/* + Look for all tables mysql.gtid_slave_pos*. Read all rows from each such + table found into ARRAY. For each domain id, put the row with highest sub_id + into HASH. +*/ +static int +scan_all_gtid_slave_pos_table(THD *thd, int (*cb)(THD *, LEX_CSTRING *, void *), + void *cb_data) +{ + static LEX_CSTRING mysql_db_name= {C_STRING_WITH_LEN("mysql")}; + char path[FN_REFLEN]; + MY_DIR *dirp; + + thd->reset_for_next_command(); + if (lock_schema_name(thd, mysql_db_name.str)) + return 1; + + build_table_filename(path, sizeof(path) - 1, mysql_db_name.str, "", "", 0); + if (!(dirp= my_dir(path, MYF(MY_DONT_SORT)))) + { + my_error(ER_FILE_NOT_FOUND, MYF(0), path, my_errno); + close_thread_tables(thd); + thd->mdl_context.release_transactional_locks(); + return 1; + } + else + { + size_t i; + Dynamic_array<LEX_CSTRING*> files(dirp->number_of_files); + Discovered_table_list tl(thd, &files); + int err; + + err= ha_discover_table_names(thd, &mysql_db_name, dirp, &tl, false); + my_dirend(dirp); + close_thread_tables(thd); + thd->mdl_context.release_transactional_locks(); + if (err) + return err; + + for (i = 0; i < files.elements(); ++i) + { + if (strncmp(files.at(i)->str, + rpl_gtid_slave_state_table_name.str, + rpl_gtid_slave_state_table_name.length) == 0) + { + if ((err= (*cb)(thd, files.at(i), cb_data))) + return err; + } + } + } + + return 0; +} + + +struct load_gtid_state_cb_data { + HASH *hash; + DYNAMIC_ARRAY *array; + struct rpl_slave_state::gtid_pos_table *table_list; + struct rpl_slave_state::gtid_pos_table *default_entry; +}; + +static int +process_gtid_pos_table(THD *thd, LEX_CSTRING *table_name, void *hton, + struct load_gtid_state_cb_data *data) +{ + struct rpl_slave_state::gtid_pos_table *p, *entry, **next_ptr; + bool is_default= + (strcmp(table_name->str, rpl_gtid_slave_state_table_name.str) == 0); + + /* + Ignore tables with duplicate storage engine, with a warning. + Prefer the default mysql.gtid_slave_pos over another table + mysql.gtid_slave_posXXX with the same storage engine. + */ + next_ptr= &data->table_list; + entry= data->table_list; + while (entry) + { + if (entry->table_hton == hton) + { + static const char *warning_msg= "Ignoring redundant table mysql.%s " + "since mysql.%s has the same storage engine"; + if (!is_default) + { + /* Ignore the redundant table. */ + sql_print_warning(warning_msg, table_name->str, entry->table_name); + return 0; + } + else + { + sql_print_warning(warning_msg, entry->table_name, table_name->str); + /* Delete the redundant table, and proceed to add this one instead. */ + *next_ptr= entry->next; + my_free(entry); + break; + } + } + next_ptr= &entry->next; + entry= entry->next; + } + + p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name, + hton, rpl_slave_state::GTID_POS_AVAILABLE); + if (!p) + return 1; + p->next= data->table_list; + data->table_list= p; + if (is_default) + data->default_entry= p; + return 0; +} + + +/* + Put tables corresponding to @@gtid_pos_auto_engines at the end of the list, + marked to be auto-created if needed. +*/ +static int +gtid_pos_auto_create_tables(rpl_slave_state::gtid_pos_table **list_ptr) +{ + plugin_ref *auto_engines; + int err= 0; + mysql_mutex_lock(&LOCK_global_system_variables); + for (auto_engines= opt_gtid_pos_auto_plugins; + !err && auto_engines && *auto_engines; + ++auto_engines) + { + void *hton= plugin_hton(*auto_engines); + char buf[FN_REFLEN+1]; + LEX_CSTRING table_name; + char *p; + rpl_slave_state::gtid_pos_table *entry, **next_ptr; + + /* See if this engine is already in the list. */ + next_ptr= list_ptr; + entry= *list_ptr; + while (entry) + { + if (entry->table_hton == hton) + break; + next_ptr= &entry->next; + entry= entry->next; + } + if (entry) + continue; + + /* Add an auto-create entry for this engine at end of list. */ + p= strmake(buf, rpl_gtid_slave_state_table_name.str, FN_REFLEN); + p= strmake(p, "_", FN_REFLEN - (p - buf)); + p= strmake(p, plugin_name(*auto_engines)->str, FN_REFLEN - (p - buf)); + table_name.str= buf; + table_name.length= p - buf; + entry= rpl_global_gtid_slave_state->alloc_gtid_pos_table + (&table_name, hton, rpl_slave_state::GTID_POS_AUTO_CREATE); + if (!entry) + { + err= 1; + break; + } + *next_ptr= entry; + } + mysql_mutex_unlock(&LOCK_global_system_variables); + return err; +} + + +static int +load_gtid_state_cb(THD *thd, LEX_CSTRING *table_name, void *arg) +{ + int err; + load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg); + void *hton; + + if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array, + table_name, &hton))) + return err; + return process_gtid_pos_table(thd, table_name, hton, data); +} + + +int +rpl_load_gtid_slave_state(THD *thd) +{ + bool array_inited= false; + struct gtid_pos_element tmp_entry, *entry; + HASH hash; + DYNAMIC_ARRAY array; + int err= 0; + uint32 i; + load_gtid_state_cb_data cb_data; + DBUG_ENTER("rpl_load_gtid_slave_state"); + + mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); + bool loaded= rpl_global_gtid_slave_state->loaded; + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + if (loaded) + DBUG_RETURN(0); + + cb_data.table_list= NULL; + cb_data.default_entry= NULL; + my_hash_init(&hash, &my_charset_bin, 32, + offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id), + sizeof(uint32), NULL, my_free, HASH_UNIQUE); + if ((err= my_init_dynamic_array(&array, sizeof(gtid_pos_element), 0, 0, MYF(0)))) + goto end; + array_inited= true; + + cb_data.hash = &hash; + cb_data.array = &array; + if ((err= scan_all_gtid_slave_pos_table(thd, load_gtid_state_cb, &cb_data))) + goto end; + + if (!cb_data.default_entry) + { + /* + If the mysql.gtid_slave_pos table does not exist, but at least one other + table is available, arbitrarily pick the first in the list to use as + default. + */ + cb_data.default_entry= cb_data.table_list; + } + if ((err= gtid_pos_auto_create_tables(&cb_data.table_list))) + goto end; mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); if (rpl_global_gtid_slave_state->loaded) @@ -1634,14 +1863,24 @@ rpl_load_gtid_slave_state(THD *thd) goto end; } + if (!cb_data.table_list) + { + my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql", + rpl_gtid_slave_state_table_name.str); + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + err= 1; + goto end; + } + for (i= 0; i < array.elements; ++i) { get_dynamic(&array, (uchar *)&tmp_entry, i); if ((err= rpl_global_gtid_slave_state->update(tmp_entry.gtid.domain_id, - tmp_entry.gtid.server_id, - tmp_entry.sub_id, - tmp_entry.gtid.seq_no, - NULL))) + tmp_entry.gtid.server_id, + tmp_entry.sub_id, + tmp_entry.gtid.seq_no, + tmp_entry.hton, + NULL))) { mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); my_error(ER_OUT_OF_RESOURCES, MYF(0)); @@ -1651,7 +1890,7 @@ rpl_load_gtid_slave_state(THD *thd) for (i= 0; i < hash.records; ++i) { - entry= (struct local_element *)my_hash_element(&hash, i); + entry= (struct gtid_pos_element *)my_hash_element(&hash, i); if (opt_bin_log && mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id, entry->gtid.seq_no)) @@ -1662,27 +1901,175 @@ rpl_load_gtid_slave_state(THD *thd) } } + rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list, + cb_data.default_entry); + cb_data.table_list= NULL; rpl_global_gtid_slave_state->loaded= true; mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); - err= 0; /* Clear HA_ERR_END_OF_FILE */ +end: + if (array_inited) + delete_dynamic(&array); + my_hash_free(&hash); + if (cb_data.table_list) + rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list); + DBUG_RETURN(err); +} + + +static int +find_gtid_pos_tables_cb(THD *thd, LEX_CSTRING *table_name, void *arg) +{ + load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg); + TABLE_LIST tlist; + TABLE *table= NULL; + int err; + + thd->reset_for_next_command(); + tlist.init_one_table(STRING_WITH_LEN("mysql"), table_name->str, + table_name->length, NULL, TL_READ); + if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) + goto end; + table= tlist.table; + + if ((err= gtid_check_rpl_slave_state_table(table))) + goto end; + err= process_gtid_pos_table(thd, table_name, table->s->db_type(), data); end: - if (table_scanned) + if (table) { - table->file->ha_index_or_rnd_end(); ha_commit_trans(thd, FALSE); ha_commit_trans(thd, TRUE); - } - if (table_opened) - { close_thread_tables(thd); thd->mdl_context.release_transactional_locks(); } - if (array_inited) - delete_dynamic(&array); - my_hash_free(&hash); - DBUG_RETURN(err); + + return err; +} + + +/* + Re-compute the list of available mysql.gtid_slave_posXXX tables. + + This is done at START SLAVE to pick up any newly created tables without + requiring server restart. +*/ +int +find_gtid_slave_pos_tables(THD *thd) +{ + int err= 0; + load_gtid_state_cb_data cb_data; + uint num_running; + + mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); + bool loaded= rpl_global_gtid_slave_state->loaded; + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + if (!loaded) + return 0; + + cb_data.table_list= NULL; + cb_data.default_entry= NULL; + if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &cb_data))) + goto end; + + if (!cb_data.table_list) + { + my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql", + rpl_gtid_slave_state_table_name.str); + err= 1; + goto end; + } + if (!cb_data.default_entry) + { + /* + If the mysql.gtid_slave_pos table does not exist, but at least one other + table is available, arbitrarily pick the first in the list to use as + default. + */ + cb_data.default_entry= cb_data.table_list; + } + if ((err= gtid_pos_auto_create_tables(&cb_data.table_list))) + goto end; + + mysql_mutex_lock(&LOCK_active_mi); + num_running= any_slave_sql_running(true); + mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); + if (num_running <= 1) + { + /* + If no slave is running now, the count will be 1, since this SQL thread + which is starting is included in the count. In this case, we can safely + replace the list, no-one can be trying to read it without lock. + */ + DBUG_ASSERT(num_running == 1); + rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list, + cb_data.default_entry); + cb_data.table_list= NULL; + } + else + { + /* + If there are SQL threads running, we cannot safely remove the old list. + However we can add new entries, and warn about any tables that + disappeared, but may still be visible to running SQL threads. + */ + rpl_slave_state::gtid_pos_table *old_entry, *new_entry, **next_ptr_ptr; + + old_entry= (rpl_slave_state::gtid_pos_table *) + rpl_global_gtid_slave_state->gtid_pos_tables; + while (old_entry) + { + new_entry= cb_data.table_list; + while (new_entry) + { + if (new_entry->table_hton == old_entry->table_hton) + break; + new_entry= new_entry->next; + } + if (!new_entry) + sql_print_warning("The table mysql.%s was removed. " + "This change will not take full effect " + "until all SQL threads have been restarted", + old_entry->table_name.str); + old_entry= old_entry->next; + } + next_ptr_ptr= &cb_data.table_list; + new_entry= cb_data.table_list; + while (new_entry) + { + /* Check if we already have a table with this storage engine. */ + old_entry= (rpl_slave_state::gtid_pos_table *) + rpl_global_gtid_slave_state->gtid_pos_tables; + while (old_entry) + { + if (new_entry->table_hton == old_entry->table_hton) + break; + old_entry= old_entry->next; + } + if (old_entry) + { + /* This new_entry is already available in the list. */ + next_ptr_ptr= &new_entry->next; + new_entry= new_entry->next; + } + else + { + /* Move this new_entry to the list. */ + rpl_slave_state::gtid_pos_table *next= new_entry->next; + rpl_global_gtid_slave_state->add_gtid_pos_table(new_entry); + *next_ptr_ptr= next; + new_entry= next; + } + } + } + mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state); + mysql_mutex_unlock(&LOCK_active_mi); + +end: + if (cb_data.table_list) + rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list); + return err; } |