summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDavi Arnaut <davi@twitter.com>2012-03-19 15:00:23 -0700
committerunknown <knielsen@knielsen-hq.org>2012-03-19 15:00:23 -0700
commit9584cbe7fcc4ea98598087848f96c5e28c15d1d8 (patch)
tree57c734af5b99a03e1bdc8fb02ac4de1e7a72fc02 /sql
parent7789f3d56738c581d8e805650f83b626381075ea (diff)
downloadmariadb-git-9584cbe7fcc4ea98598087848f96c5e28c15d1d8.tar.gz
Make Replication filter settings dynamic.
Make the slave options --replicate-* dynamic variables so that these options can be changed dynamically while the server is running, which enables users to modify replication filtering rules without having to stop and restart the server. This is accomplished by just requiring that the slave threads are stopped when these options are set dynamically. Since filtering rules are only used by the SQL slave thread, setting them while the thread is not running avoids the need for locking.
Diffstat (limited to 'sql')
-rw-r--r--sql/rpl_filter.cc236
-rw-r--r--sql/rpl_filter.h24
-rw-r--r--sql/sys_vars.cc144
-rw-r--r--sql/sys_vars.h50
4 files changed, 437 insertions, 17 deletions
diff --git a/sql/rpl_filter.cc b/sql/rpl_filter.cc
index 5f5473e09ab..0380bc323a3 100644
--- a/sql/rpl_filter.cc
+++ b/sql/rpl_filter.cc
@@ -42,8 +42,8 @@ Rpl_filter::~Rpl_filter()
free_string_array(&wild_do_table);
if (wild_ignore_table_inited)
free_string_array(&wild_ignore_table);
- free_list(&do_db);
- free_list(&ignore_db);
+ free_string_list(&do_db);
+ free_string_list(&ignore_db);
free_list(&rewrite_db);
}
@@ -263,6 +263,57 @@ Rpl_filter::is_on()
}
+/**
+ Parse and add the given comma-separated sequence of filter rules.
+
+ @param spec Comma-separated sequence of filter rules.
+ @param add Callback member function to add a filter rule.
+
+ @return true if error, false otherwise.
+*/
+
+int
+Rpl_filter::parse_filter_rule(const char* spec, Add_filter add)
+{
+ int status= 0;
+ char *arg, *ptr, *pstr;
+
+ if (! (ptr= my_strdup(spec, MYF(MY_WME))))
+ return true;
+
+ pstr= ptr;
+
+ while (pstr)
+ {
+ arg= pstr;
+
+ /* Parse token string. */
+ pstr= strpbrk(arg, ",");
+
+ /* NUL terminate the token string. */
+ if (pstr)
+ *pstr++= '\0';
+
+ /* Skip an empty token string. */
+ if (arg[0] == '\0')
+ continue;
+
+ /* Skip leading spaces. */
+ while (my_isspace(system_charset_info, *arg))
+ arg++;
+
+ status= (this->*add)(arg);
+
+ if (status)
+ break;
+ }
+
+ my_free(ptr);
+
+ return status;
+}
+
+
int
Rpl_filter::add_do_table(const char* table_spec)
{
@@ -285,6 +336,46 @@ Rpl_filter::add_ignore_table(const char* table_spec)
}
+int
+Rpl_filter::set_do_table(const char* table_spec)
+{
+ int status;
+
+ if (do_table_inited)
+ my_hash_reset(&do_table);
+
+ status= parse_filter_rule(table_spec, &Rpl_filter::add_do_table);
+
+ if (!do_table.records)
+ {
+ my_hash_free(&do_table);
+ do_table_inited= 0;
+ }
+
+ return status;
+}
+
+
+int
+Rpl_filter::set_ignore_table(const char* table_spec)
+{
+ int status;
+
+ if (ignore_table_inited)
+ my_hash_reset(&ignore_table);
+
+ status= parse_filter_rule(table_spec, &Rpl_filter::add_ignore_table);
+
+ if (!ignore_table.records)
+ {
+ my_hash_free(&ignore_table);
+ ignore_table_inited= 0;
+ }
+
+ return status;
+}
+
+
int
Rpl_filter::add_wild_do_table(const char* table_spec)
{
@@ -307,6 +398,46 @@ Rpl_filter::add_wild_ignore_table(const char* table_spec)
}
+int
+Rpl_filter::set_wild_do_table(const char* table_spec)
+{
+ int status;
+
+ if (wild_do_table_inited)
+ free_string_array(&wild_do_table);
+
+ status= parse_filter_rule(table_spec, &Rpl_filter::add_wild_do_table);
+
+ if (!wild_do_table.elements)
+ {
+ delete_dynamic(&wild_do_table);
+ wild_do_table_inited= 0;
+ }
+
+ return status;
+}
+
+
+int
+Rpl_filter::set_wild_ignore_table(const char* table_spec)
+{
+ int status;
+
+ if (wild_ignore_table_inited)
+ free_string_array(&wild_ignore_table);
+
+ status= parse_filter_rule(table_spec, &Rpl_filter::add_wild_ignore_table);
+
+ if (!wild_ignore_table.elements)
+ {
+ delete_dynamic(&wild_ignore_table);
+ wild_ignore_table_inited= 0;
+ }
+
+ return status;
+}
+
+
void
Rpl_filter::add_db_rewrite(const char* from_db, const char* to_db)
{
@@ -355,25 +486,59 @@ Rpl_filter::add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
}
-void
+int
+Rpl_filter::add_string_list(I_List<i_string> *list, const char* spec)
+{
+ char *str;
+ i_string *node;
+
+ if (! (str= my_strdup(spec, MYF(MY_WME))))
+ return true;
+
+ if (! (node= new i_string(str)))
+ {
+ my_free(str);
+ return true;
+ }
+
+ list->push_back(node);
+
+ return false;
+}
+
+
+int
Rpl_filter::add_do_db(const char* table_spec)
{
DBUG_ENTER("Rpl_filter::add_do_db");
- i_string *db = new i_string(table_spec);
- do_db.push_back(db);
- DBUG_VOID_RETURN;
+ DBUG_RETURN(add_string_list(&do_db, table_spec));
}
-void
+int
Rpl_filter::add_ignore_db(const char* table_spec)
{
DBUG_ENTER("Rpl_filter::add_ignore_db");
- i_string *db = new i_string(table_spec);
- ignore_db.push_back(db);
- DBUG_VOID_RETURN;
+ DBUG_RETURN(add_string_list(&ignore_db, table_spec));
}
+
+int
+Rpl_filter::set_do_db(const char* db_spec)
+{
+ free_string_list(&do_db);
+ return parse_filter_rule(db_spec, &Rpl_filter::add_do_db);
+}
+
+
+int
+Rpl_filter::set_ignore_db(const char* db_spec)
+{
+ free_string_list(&ignore_db);
+ return parse_filter_rule(db_spec, &Rpl_filter::add_ignore_db);
+}
+
+
extern "C" uchar *get_table_key(const uchar *, size_t *, my_bool);
extern "C" void free_table_ent(void* a);
@@ -448,6 +613,23 @@ Rpl_filter::free_string_array(DYNAMIC_ARRAY *a)
}
+void
+Rpl_filter::free_string_list(I_List<i_string> *l)
+{
+ void *ptr;
+ i_string *tmp;
+
+ while ((tmp= l->get()))
+ {
+ ptr= (void *) tmp->ptr;
+ my_free(ptr);
+ delete tmp;
+ }
+
+ l->empty();
+}
+
+
/*
Builds a String from a HASH of TABLE_RULE_ENT. Cannot be used for any other
hash, as it assumes that the hash entries are TABLE_RULE_ENT.
@@ -564,3 +746,37 @@ Rpl_filter::get_ignore_db()
{
return &ignore_db;
}
+
+
+void
+Rpl_filter::db_rule_ent_list_to_str(String* str, I_List<i_string>* list)
+{
+ I_List_iterator<i_string> it(*list);
+ i_string* s;
+
+ str->length(0);
+
+ while ((s= it++))
+ {
+ str->append(s->ptr);
+ str->append(',');
+ }
+
+ // Remove last ','
+ if (!str->is_empty())
+ str->chop();
+}
+
+
+void
+Rpl_filter::get_do_db(String* str)
+{
+ db_rule_ent_list_to_str(str, get_do_db());
+}
+
+
+void
+Rpl_filter::get_ignore_db(String* str)
+{
+ db_rule_ent_list_to_str(str, get_ignore_db());
+}
diff --git a/sql/rpl_filter.h b/sql/rpl_filter.h
index d32fb36d6fb..2eb0340b714 100644
--- a/sql/rpl_filter.h
+++ b/sql/rpl_filter.h
@@ -61,11 +61,20 @@ public:
int add_do_table(const char* table_spec);
int add_ignore_table(const char* table_spec);
+ int set_do_table(const char* table_spec);
+ int set_ignore_table(const char* table_spec);
+
int add_wild_do_table(const char* table_spec);
int add_wild_ignore_table(const char* table_spec);
- void add_do_db(const char* db_spec);
- void add_ignore_db(const char* db_spec);
+ int set_wild_do_table(const char* table_spec);
+ int set_wild_ignore_table(const char* table_spec);
+
+ int add_do_db(const char* db_spec);
+ int add_ignore_db(const char* db_spec);
+
+ int set_do_db(const char* db_spec);
+ int set_ignore_db(const char* db_spec);
void add_db_rewrite(const char* from_db, const char* to_db);
@@ -83,6 +92,9 @@ public:
I_List<i_string>* get_do_db();
I_List<i_string>* get_ignore_db();
+ void get_do_db(String* str);
+ void get_ignore_db(String* str);
+
private:
bool table_rules_on;
@@ -92,13 +104,21 @@ private:
int add_table_rule(HASH* h, const char* table_spec);
int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec);
+ typedef int (Rpl_filter::*Add_filter)(char const*);
+
+ int parse_filter_rule(const char* spec, Add_filter func);
+
void free_string_array(DYNAMIC_ARRAY *a);
+ void free_string_list(I_List<i_string> *l);
void table_rule_ent_hash_to_str(String* s, HASH* h, bool inited);
void table_rule_ent_dynamic_array_to_str(String* s, DYNAMIC_ARRAY* a,
bool inited);
+ void db_rule_ent_list_to_str(String* s, I_List<i_string>* l);
TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len);
+ int add_string_list(I_List<i_string> *list, const char* spec);
+
/*
Those 4 structures below are uninitialized memory unless the
corresponding *_inited variables are "true".
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 72e9525db72..a16769459b4 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -3255,6 +3255,150 @@ static Sys_var_mybool Sys_relay_log_recovery(
"processed",
GLOBAL_VAR(relay_log_recovery), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+bool Sys_var_rpl_filter::do_check(THD *thd, set_var *var)
+{
+ bool status;
+
+ mysql_mutex_lock(&LOCK_active_mi);
+ mysql_mutex_lock(&active_mi->rli.run_lock);
+
+ status= active_mi->rli.slave_running;
+
+ mysql_mutex_unlock(&active_mi->rli.run_lock);
+ mysql_mutex_unlock(&LOCK_active_mi);
+
+ if (status)
+ my_error(ER_SLAVE_MUST_STOP, MYF(0));
+ else
+ status= Sys_var_charptr::do_string_check(thd, var, charset(thd));
+
+ return status;
+}
+
+bool Sys_var_rpl_filter::global_update(THD *thd, set_var *var)
+{
+ bool slave_running, status= false;
+
+ mysql_mutex_lock(&LOCK_active_mi);
+ mysql_mutex_lock(&active_mi->rli.run_lock);
+
+ if (! (slave_running= active_mi->rli.slave_running))
+ status= set_filter_value(var->save_result.string_value.str);
+
+ mysql_mutex_unlock(&active_mi->rli.run_lock);
+ mysql_mutex_unlock(&LOCK_active_mi);
+
+ if (slave_running)
+ my_error(ER_SLAVE_MUST_STOP, MYF(0));
+
+ return slave_running || status;
+}
+
+bool Sys_var_rpl_filter::set_filter_value(const char *value)
+{
+ bool status= true;
+
+ switch (opt_id) {
+ case OPT_REPLICATE_DO_DB:
+ status= rpl_filter->set_do_db(value);
+ break;
+ case OPT_REPLICATE_DO_TABLE:
+ status= rpl_filter->set_do_table(value);
+ break;
+ case OPT_REPLICATE_IGNORE_DB:
+ status= rpl_filter->set_ignore_db(value);
+ break;
+ case OPT_REPLICATE_IGNORE_TABLE:
+ status= rpl_filter->set_ignore_table(value);
+ break;
+ case OPT_REPLICATE_WILD_DO_TABLE:
+ status= rpl_filter->set_wild_do_table(value);
+ break;
+ case OPT_REPLICATE_WILD_IGNORE_TABLE:
+ status= rpl_filter->set_wild_ignore_table(value);
+ break;
+ }
+
+ return status;
+}
+
+uchar *Sys_var_rpl_filter::global_value_ptr(THD *thd, LEX_STRING *base)
+{
+ char buf[256];
+ String tmp(buf, sizeof(buf), &my_charset_bin);
+
+ tmp.length(0);
+
+ mysql_mutex_lock(&LOCK_active_mi);
+ mysql_mutex_lock(&active_mi->rli.run_lock);
+
+ switch (opt_id) {
+ case OPT_REPLICATE_DO_DB:
+ rpl_filter->get_do_db(&tmp);
+ break;
+ case OPT_REPLICATE_DO_TABLE:
+ rpl_filter->get_do_table(&tmp);
+ break;
+ case OPT_REPLICATE_IGNORE_DB:
+ rpl_filter->get_ignore_db(&tmp);
+ break;
+ case OPT_REPLICATE_IGNORE_TABLE:
+ rpl_filter->get_ignore_table(&tmp);
+ break;
+ case OPT_REPLICATE_WILD_DO_TABLE:
+ rpl_filter->get_wild_do_table(&tmp);
+ break;
+ case OPT_REPLICATE_WILD_IGNORE_TABLE:
+ rpl_filter->get_wild_ignore_table(&tmp);
+ break;
+ }
+
+ mysql_mutex_unlock(&active_mi->rli.run_lock);
+ mysql_mutex_unlock(&LOCK_active_mi);
+
+ return (uchar *) thd->strmake(tmp.ptr(), tmp.length());
+}
+
+static Sys_var_rpl_filter Sys_replicate_do_db(
+ "replicate_do_db", OPT_REPLICATE_DO_DB,
+ "Tell the slave to restrict replication to updates of tables "
+ "whose names appear in the comma-separated list. For "
+ "statement-based replication, only the default database (that "
+ "is, the one selected by USE) is considered, not any explicitly "
+ "mentioned tables in the query. For row-based replication, the "
+ "actual names of table(s) being updated are checked.");
+
+static Sys_var_rpl_filter Sys_replicate_do_table(
+ "replicate_do_table", OPT_REPLICATE_DO_TABLE,
+ "Tells the slave to restrict replication to tables in the "
+ "comma-separated list.");
+
+static Sys_var_rpl_filter Sys_replicate_ignore_db(
+ "replicate_ignore_db", OPT_REPLICATE_IGNORE_DB,
+ "Tell the slave to restrict replication to updates of tables "
+ "whose names do not appear in the comma-separated list. For "
+ "statement-based replication, only the default database (that "
+ "is, the one selected by USE) is considered, not any explicitly "
+ "mentioned tables in the query. For row-based replication, the "
+ "actual names of table(s) being updated are checked.");
+
+static Sys_var_rpl_filter Sys_replicate_ignore_table(
+ "replicate_ignore_table", OPT_REPLICATE_IGNORE_TABLE,
+ "Tells the slave thread not to replicate any statement that "
+ "updates the specified table, even if any other tables might be "
+ "updated by the same statement.");
+
+static Sys_var_rpl_filter Sys_replicate_wild_do_table(
+ "replicate_wild_do_table", OPT_REPLICATE_WILD_DO_TABLE,
+ "Tells the slave thread to restrict replication to statements "
+ "where any of the updated tables match the specified database "
+ "and table name patterns.");
+
+static Sys_var_rpl_filter Sys_replicate_wild_ignore_table(
+ "replicate_wild_ignore_table", OPT_REPLICATE_WILD_IGNORE_TABLE,
+ "Tells the slave thread to not replicate to the tables that "
+ "match the given wildcard pattern.");
+
static Sys_var_charptr Sys_slave_load_tmpdir(
"slave_load_tmpdir", "The location where the slave should put "
"its temporary files when replicating a LOAD DATA INFILE command",
diff --git a/sql/sys_vars.h b/sql/sys_vars.h
index f2a2966e6a2..647403a96e4 100644
--- a/sql/sys_vars.h
+++ b/sql/sys_vars.h
@@ -430,11 +430,11 @@ public:
my_free(global_var(char*));
flags&= ~ALLOCATED;
}
- bool do_check(THD *thd, set_var *var)
+ static bool do_string_check(THD *thd, set_var *var, CHARSET_INFO *charset)
{
char buff[STRING_BUFFER_USUAL_SIZE], buff2[STRING_BUFFER_USUAL_SIZE];
- String str(buff, sizeof(buff), charset(thd));
- String str2(buff2, sizeof(buff2), charset(thd)), *res;
+ String str(buff, sizeof(buff), charset);
+ String str2(buff2, sizeof(buff2), charset), *res;
if (!(res=var->value->val_str(&str)))
var->save_result.string_value.str= 0;
@@ -442,10 +442,10 @@ public:
{
uint32 unused;
if (String::needs_conversion(res->length(), res->charset(),
- charset(thd), &unused))
+ charset, &unused))
{
uint errors;
- str2.copy(res->ptr(), res->length(), res->charset(), charset(thd),
+ str2.copy(res->ptr(), res->length(), res->charset(), charset,
&errors);
res=&str2;
@@ -456,6 +456,8 @@ public:
return false;
}
+ bool do_check(THD *thd, set_var *var)
+ { return do_string_check(thd, var, charset(thd)); }
bool session_update(THD *thd, set_var *var)
{
DBUG_ASSERT(FALSE);
@@ -550,6 +552,44 @@ protected:
}
};
+class Sys_var_rpl_filter: public sys_var
+{
+private:
+ int opt_id;
+
+public:
+ Sys_var_rpl_filter(const char *name, int getopt_id, const char *comment)
+ : sys_var(&all_sys_vars, name, comment, sys_var::GLOBAL, 0, -1,
+ NO_ARG, SHOW_CHAR, 0, NULL, VARIABLE_NOT_IN_BINLOG,
+ NULL, NULL, 0, NULL), opt_id(getopt_id)
+ {
+ option.var_type= GET_STR;
+ }
+
+ bool check_update_type(Item_result type)
+ { return type != STRING_RESULT; }
+
+ bool do_check(THD *thd, set_var *var);
+
+ void session_save_default(THD *thd, set_var *var)
+ { DBUG_ASSERT(FALSE); }
+
+ void global_save_default(THD *thd, set_var *var)
+ { DBUG_ASSERT(FALSE); }
+
+ bool session_update(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(FALSE);
+ return true;
+ }
+
+ bool global_update(THD *thd, set_var *var);
+
+protected:
+ uchar *global_value_ptr(THD *thd, LEX_STRING *base);
+ bool set_filter_value(const char *value);
+};
+
/**
The class for string variables. Useful for strings that aren't necessarily
\0-terminated. Otherwise the same as Sys_var_charptr.