summaryrefslogtreecommitdiff
path: root/sql/rpl_rli.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r--sql/rpl_rli.cc485
1 files changed, 436 insertions, 49 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index ed9f0369f5d..efb256fbe11 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -14,7 +14,7 @@
along with this program; if not, write to the Free Software Foundation,
51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
-#include <my_global.h>
+#include "mariadb.h"
#include "sql_priv.h"
#include "unireg.h" // HAVE_*
#include "rpl_mi.h"
@@ -31,6 +31,8 @@
#include "slave.h"
#include <mysql/plugin.h>
#include <mysql/service_thd_wait.h>
+#include "lock.h"
+#include "sql_table.h"
static int count_relay_log_space(Relay_log_info* rli);
@@ -1509,41 +1511,22 @@ Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count)
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-int
-rpl_load_gtid_slave_state(THD *thd)
+struct gtid_pos_element { uint64 sub_id; rpl_gtid gtid; void *hton; };
+
+static int
+scan_one_gtid_slave_pos_table(THD *thd, HASH *hash, DYNAMIC_ARRAY *array,
+ LEX_CSTRING *tablename, void **out_hton)
{
TABLE_LIST tlist;
TABLE *table;
bool table_opened= false;
bool table_scanned= false;
- bool array_inited= false;
- struct local_element { uint64 sub_id; rpl_gtid gtid; };
- struct local_element tmp_entry, *entry;
- HASH hash;
- DYNAMIC_ARRAY array;
+ struct gtid_pos_element tmp_entry, *entry;
int err= 0;
- uint32 i;
- DBUG_ENTER("rpl_load_gtid_slave_state");
-
- mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
- bool loaded= rpl_global_gtid_slave_state->loaded;
- mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
- if (loaded)
- DBUG_RETURN(0);
-
- my_hash_init(&hash, &my_charset_bin, 32,
- offsetof(local_element, gtid) + offsetof(rpl_gtid, domain_id),
- sizeof(uint32), NULL, my_free, HASH_UNIQUE);
- if ((err= my_init_dynamic_array(&array, sizeof(local_element), 0, 0, MYF(0))))
- goto end;
- array_inited= true;
thd->reset_for_next_command();
-
- tlist.init_one_table(STRING_WITH_LEN("mysql"),
- rpl_gtid_slave_state_table_name.str,
- rpl_gtid_slave_state_table_name.length,
- NULL, TL_READ);
+ tlist.init_one_table(STRING_WITH_LEN("mysql"), tablename->str,
+ tablename->length, NULL, TL_READ);
if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
goto end;
table_opened= true;
@@ -1589,26 +1572,28 @@ rpl_load_gtid_slave_state(THD *thd)
tmp_entry.gtid.domain_id= domain_id;
tmp_entry.gtid.server_id= server_id;
tmp_entry.gtid.seq_no= seq_no;
- if ((err= insert_dynamic(&array, (uchar *)&tmp_entry)))
+ tmp_entry.hton= table->s->db_type();
+ if ((err= insert_dynamic(array, (uchar *)&tmp_entry)))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
- if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0)))
+ if ((rec= my_hash_search(hash, (const uchar *)&domain_id, 0)))
{
- entry= (struct local_element *)rec;
+ entry= (struct gtid_pos_element *)rec;
if (entry->sub_id >= sub_id)
continue;
entry->sub_id= sub_id;
DBUG_ASSERT(entry->gtid.domain_id == domain_id);
entry->gtid.server_id= server_id;
entry->gtid.seq_no= seq_no;
+ entry->hton= table->s->db_type();
}
else
{
- if (!(entry= (struct local_element *)my_malloc(sizeof(*entry),
- MYF(MY_WME))))
+ if (!(entry= (struct gtid_pos_element *)my_malloc(sizeof(*entry),
+ MYF(MY_WME))))
{
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry));
err= 1;
@@ -1618,7 +1603,8 @@ rpl_load_gtid_slave_state(THD *thd)
entry->gtid.domain_id= domain_id;
entry->gtid.server_id= server_id;
entry->gtid.seq_no= seq_no;
- if ((err= my_hash_insert(&hash, (uchar *)entry)))
+ entry->hton= table->s->db_type();
+ if ((err= my_hash_insert(hash, (uchar *)entry)))
{
my_free(entry);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
@@ -1626,6 +1612,249 @@ rpl_load_gtid_slave_state(THD *thd)
}
}
}
+ err= 0; /* Clear HA_ERR_END_OF_FILE */
+
+end:
+ if (table_scanned)
+ {
+ table->file->ha_index_or_rnd_end();
+ ha_commit_trans(thd, FALSE);
+ ha_commit_trans(thd, TRUE);
+ }
+ if (table_opened)
+ {
+ *out_hton= table->s->db_type();
+ close_thread_tables(thd);
+ thd->mdl_context.release_transactional_locks();
+ }
+ return err;
+}
+
+
+/*
+ Look for all tables mysql.gtid_slave_pos*. Read all rows from each such
+ table found into ARRAY. For each domain id, put the row with highest sub_id
+ into HASH.
+*/
+static int
+scan_all_gtid_slave_pos_table(THD *thd, int (*cb)(THD *, LEX_CSTRING *, void *),
+ void *cb_data)
+{
+ static LEX_CSTRING mysql_db_name= {C_STRING_WITH_LEN("mysql")};
+ char path[FN_REFLEN];
+ MY_DIR *dirp;
+
+ thd->reset_for_next_command();
+ if (lock_schema_name(thd, mysql_db_name.str))
+ return 1;
+
+ build_table_filename(path, sizeof(path) - 1, mysql_db_name.str, "", "", 0);
+ if (!(dirp= my_dir(path, MYF(MY_DONT_SORT))))
+ {
+ my_error(ER_FILE_NOT_FOUND, MYF(0), path, my_errno);
+ close_thread_tables(thd);
+ thd->mdl_context.release_transactional_locks();
+ return 1;
+ }
+ else
+ {
+ size_t i;
+ Dynamic_array<LEX_CSTRING*> files(dirp->number_of_files);
+ Discovered_table_list tl(thd, &files);
+ int err;
+
+ err= ha_discover_table_names(thd, &mysql_db_name, dirp, &tl, false);
+ my_dirend(dirp);
+ close_thread_tables(thd);
+ thd->mdl_context.release_transactional_locks();
+ if (err)
+ return err;
+
+ for (i = 0; i < files.elements(); ++i)
+ {
+ if (strncmp(files.at(i)->str,
+ rpl_gtid_slave_state_table_name.str,
+ rpl_gtid_slave_state_table_name.length) == 0)
+ {
+ if ((err= (*cb)(thd, files.at(i), cb_data)))
+ return err;
+ }
+ }
+ }
+
+ return 0;
+}
+
+
+struct load_gtid_state_cb_data {
+ HASH *hash;
+ DYNAMIC_ARRAY *array;
+ struct rpl_slave_state::gtid_pos_table *table_list;
+ struct rpl_slave_state::gtid_pos_table *default_entry;
+};
+
+static int
+process_gtid_pos_table(THD *thd, LEX_CSTRING *table_name, void *hton,
+ struct load_gtid_state_cb_data *data)
+{
+ struct rpl_slave_state::gtid_pos_table *p, *entry, **next_ptr;
+ bool is_default=
+ (strcmp(table_name->str, rpl_gtid_slave_state_table_name.str) == 0);
+
+ /*
+ Ignore tables with duplicate storage engine, with a warning.
+ Prefer the default mysql.gtid_slave_pos over another table
+ mysql.gtid_slave_posXXX with the same storage engine.
+ */
+ next_ptr= &data->table_list;
+ entry= data->table_list;
+ while (entry)
+ {
+ if (entry->table_hton == hton)
+ {
+ static const char *warning_msg= "Ignoring redundant table mysql.%s "
+ "since mysql.%s has the same storage engine";
+ if (!is_default)
+ {
+ /* Ignore the redundant table. */
+ sql_print_warning(warning_msg, table_name->str, entry->table_name);
+ return 0;
+ }
+ else
+ {
+ sql_print_warning(warning_msg, entry->table_name, table_name->str);
+ /* Delete the redundant table, and proceed to add this one instead. */
+ *next_ptr= entry->next;
+ my_free(entry);
+ break;
+ }
+ }
+ next_ptr= &entry->next;
+ entry= entry->next;
+ }
+
+ p= rpl_global_gtid_slave_state->alloc_gtid_pos_table(table_name,
+ hton, rpl_slave_state::GTID_POS_AVAILABLE);
+ if (!p)
+ return 1;
+ p->next= data->table_list;
+ data->table_list= p;
+ if (is_default)
+ data->default_entry= p;
+ return 0;
+}
+
+
+/*
+ Put tables corresponding to @@gtid_pos_auto_engines at the end of the list,
+ marked to be auto-created if needed.
+*/
+static int
+gtid_pos_auto_create_tables(rpl_slave_state::gtid_pos_table **list_ptr)
+{
+ plugin_ref *auto_engines;
+ int err= 0;
+ mysql_mutex_lock(&LOCK_global_system_variables);
+ for (auto_engines= opt_gtid_pos_auto_plugins;
+ !err && auto_engines && *auto_engines;
+ ++auto_engines)
+ {
+ void *hton= plugin_hton(*auto_engines);
+ char buf[FN_REFLEN+1];
+ LEX_CSTRING table_name;
+ char *p;
+ rpl_slave_state::gtid_pos_table *entry, **next_ptr;
+
+ /* See if this engine is already in the list. */
+ next_ptr= list_ptr;
+ entry= *list_ptr;
+ while (entry)
+ {
+ if (entry->table_hton == hton)
+ break;
+ next_ptr= &entry->next;
+ entry= entry->next;
+ }
+ if (entry)
+ continue;
+
+ /* Add an auto-create entry for this engine at end of list. */
+ p= strmake(buf, rpl_gtid_slave_state_table_name.str, FN_REFLEN);
+ p= strmake(p, "_", FN_REFLEN - (p - buf));
+ p= strmake(p, plugin_name(*auto_engines)->str, FN_REFLEN - (p - buf));
+ table_name.str= buf;
+ table_name.length= p - buf;
+ entry= rpl_global_gtid_slave_state->alloc_gtid_pos_table
+ (&table_name, hton, rpl_slave_state::GTID_POS_AUTO_CREATE);
+ if (!entry)
+ {
+ err= 1;
+ break;
+ }
+ *next_ptr= entry;
+ }
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ return err;
+}
+
+
+static int
+load_gtid_state_cb(THD *thd, LEX_CSTRING *table_name, void *arg)
+{
+ int err;
+ load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
+ void *hton;
+
+ if ((err= scan_one_gtid_slave_pos_table(thd, data->hash, data->array,
+ table_name, &hton)))
+ return err;
+ return process_gtid_pos_table(thd, table_name, hton, data);
+}
+
+
+int
+rpl_load_gtid_slave_state(THD *thd)
+{
+ bool array_inited= false;
+ struct gtid_pos_element tmp_entry, *entry;
+ HASH hash;
+ DYNAMIC_ARRAY array;
+ int err= 0;
+ uint32 i;
+ load_gtid_state_cb_data cb_data;
+ DBUG_ENTER("rpl_load_gtid_slave_state");
+
+ mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ bool loaded= rpl_global_gtid_slave_state->loaded;
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ if (loaded)
+ DBUG_RETURN(0);
+
+ cb_data.table_list= NULL;
+ cb_data.default_entry= NULL;
+ my_hash_init(&hash, &my_charset_bin, 32,
+ offsetof(gtid_pos_element, gtid) + offsetof(rpl_gtid, domain_id),
+ sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+ if ((err= my_init_dynamic_array(&array, sizeof(gtid_pos_element), 0, 0, MYF(0))))
+ goto end;
+ array_inited= true;
+
+ cb_data.hash = &hash;
+ cb_data.array = &array;
+ if ((err= scan_all_gtid_slave_pos_table(thd, load_gtid_state_cb, &cb_data)))
+ goto end;
+
+ if (!cb_data.default_entry)
+ {
+ /*
+ If the mysql.gtid_slave_pos table does not exist, but at least one other
+ table is available, arbitrarily pick the first in the list to use as
+ default.
+ */
+ cb_data.default_entry= cb_data.table_list;
+ }
+ if ((err= gtid_pos_auto_create_tables(&cb_data.table_list)))
+ goto end;
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
if (rpl_global_gtid_slave_state->loaded)
@@ -1634,14 +1863,24 @@ rpl_load_gtid_slave_state(THD *thd)
goto end;
}
+ if (!cb_data.table_list)
+ {
+ my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
+ rpl_gtid_slave_state_table_name.str);
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ err= 1;
+ goto end;
+ }
+
for (i= 0; i < array.elements; ++i)
{
get_dynamic(&array, (uchar *)&tmp_entry, i);
if ((err= rpl_global_gtid_slave_state->update(tmp_entry.gtid.domain_id,
- tmp_entry.gtid.server_id,
- tmp_entry.sub_id,
- tmp_entry.gtid.seq_no,
- NULL)))
+ tmp_entry.gtid.server_id,
+ tmp_entry.sub_id,
+ tmp_entry.gtid.seq_no,
+ tmp_entry.hton,
+ NULL)))
{
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
@@ -1651,7 +1890,7 @@ rpl_load_gtid_slave_state(THD *thd)
for (i= 0; i < hash.records; ++i)
{
- entry= (struct local_element *)my_hash_element(&hash, i);
+ entry= (struct gtid_pos_element *)my_hash_element(&hash, i);
if (opt_bin_log &&
mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
entry->gtid.seq_no))
@@ -1662,27 +1901,175 @@ rpl_load_gtid_slave_state(THD *thd)
}
}
+ rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
+ cb_data.default_entry);
+ cb_data.table_list= NULL;
rpl_global_gtid_slave_state->loaded= true;
mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
- err= 0; /* Clear HA_ERR_END_OF_FILE */
+end:
+ if (array_inited)
+ delete_dynamic(&array);
+ my_hash_free(&hash);
+ if (cb_data.table_list)
+ rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list);
+ DBUG_RETURN(err);
+}
+
+
+static int
+find_gtid_pos_tables_cb(THD *thd, LEX_CSTRING *table_name, void *arg)
+{
+ load_gtid_state_cb_data *data= static_cast<load_gtid_state_cb_data *>(arg);
+ TABLE_LIST tlist;
+ TABLE *table= NULL;
+ int err;
+
+ thd->reset_for_next_command();
+ tlist.init_one_table(STRING_WITH_LEN("mysql"), table_name->str,
+ table_name->length, NULL, TL_READ);
+ if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
+ goto end;
+ table= tlist.table;
+
+ if ((err= gtid_check_rpl_slave_state_table(table)))
+ goto end;
+ err= process_gtid_pos_table(thd, table_name, table->s->db_type(), data);
end:
- if (table_scanned)
+ if (table)
{
- table->file->ha_index_or_rnd_end();
ha_commit_trans(thd, FALSE);
ha_commit_trans(thd, TRUE);
- }
- if (table_opened)
- {
close_thread_tables(thd);
thd->mdl_context.release_transactional_locks();
}
- if (array_inited)
- delete_dynamic(&array);
- my_hash_free(&hash);
- DBUG_RETURN(err);
+
+ return err;
+}
+
+
+/*
+ Re-compute the list of available mysql.gtid_slave_posXXX tables.
+
+ This is done at START SLAVE to pick up any newly created tables without
+ requiring server restart.
+*/
+int
+find_gtid_slave_pos_tables(THD *thd)
+{
+ int err= 0;
+ load_gtid_state_cb_data cb_data;
+ uint num_running;
+
+ mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ bool loaded= rpl_global_gtid_slave_state->loaded;
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ if (!loaded)
+ return 0;
+
+ cb_data.table_list= NULL;
+ cb_data.default_entry= NULL;
+ if ((err= scan_all_gtid_slave_pos_table(thd, find_gtid_pos_tables_cb, &cb_data)))
+ goto end;
+
+ if (!cb_data.table_list)
+ {
+ my_error(ER_NO_SUCH_TABLE, MYF(0), "mysql",
+ rpl_gtid_slave_state_table_name.str);
+ err= 1;
+ goto end;
+ }
+ if (!cb_data.default_entry)
+ {
+ /*
+ If the mysql.gtid_slave_pos table does not exist, but at least one other
+ table is available, arbitrarily pick the first in the list to use as
+ default.
+ */
+ cb_data.default_entry= cb_data.table_list;
+ }
+ if ((err= gtid_pos_auto_create_tables(&cb_data.table_list)))
+ goto end;
+
+ mysql_mutex_lock(&LOCK_active_mi);
+ num_running= any_slave_sql_running(true);
+ mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ if (num_running <= 1)
+ {
+ /*
+ If no slave is running now, the count will be 1, since this SQL thread
+ which is starting is included in the count. In this case, we can safely
+ replace the list, no-one can be trying to read it without lock.
+ */
+ DBUG_ASSERT(num_running == 1);
+ rpl_global_gtid_slave_state->set_gtid_pos_tables_list(cb_data.table_list,
+ cb_data.default_entry);
+ cb_data.table_list= NULL;
+ }
+ else
+ {
+ /*
+ If there are SQL threads running, we cannot safely remove the old list.
+ However we can add new entries, and warn about any tables that
+ disappeared, but may still be visible to running SQL threads.
+ */
+ rpl_slave_state::gtid_pos_table *old_entry, *new_entry, **next_ptr_ptr;
+
+ old_entry= (rpl_slave_state::gtid_pos_table *)
+ rpl_global_gtid_slave_state->gtid_pos_tables;
+ while (old_entry)
+ {
+ new_entry= cb_data.table_list;
+ while (new_entry)
+ {
+ if (new_entry->table_hton == old_entry->table_hton)
+ break;
+ new_entry= new_entry->next;
+ }
+ if (!new_entry)
+ sql_print_warning("The table mysql.%s was removed. "
+ "This change will not take full effect "
+ "until all SQL threads have been restarted",
+ old_entry->table_name.str);
+ old_entry= old_entry->next;
+ }
+ next_ptr_ptr= &cb_data.table_list;
+ new_entry= cb_data.table_list;
+ while (new_entry)
+ {
+ /* Check if we already have a table with this storage engine. */
+ old_entry= (rpl_slave_state::gtid_pos_table *)
+ rpl_global_gtid_slave_state->gtid_pos_tables;
+ while (old_entry)
+ {
+ if (new_entry->table_hton == old_entry->table_hton)
+ break;
+ old_entry= old_entry->next;
+ }
+ if (old_entry)
+ {
+ /* This new_entry is already available in the list. */
+ next_ptr_ptr= &new_entry->next;
+ new_entry= new_entry->next;
+ }
+ else
+ {
+ /* Move this new_entry to the list. */
+ rpl_slave_state::gtid_pos_table *next= new_entry->next;
+ rpl_global_gtid_slave_state->add_gtid_pos_table(new_entry);
+ *next_ptr_ptr= next;
+ new_entry= next;
+ }
+ }
+ }
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ mysql_mutex_unlock(&LOCK_active_mi);
+
+end:
+ if (cb_data.table_list)
+ rpl_global_gtid_slave_state->free_gtid_pos_tables(cb_data.table_list);
+ return err;
}