summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLixun Peng <P.Linux@163.com>2013-04-16 19:43:28 +0800
committerLixun Peng <P.Linux@163.com>2013-04-16 19:43:28 +0800
commit82eedf4e9745aa570b88b225941120475f9f74c9 (patch)
treec88a6bcc7efd700de4e64af88edb7e3d02f9dd1b
parentaa052eeb1a020b8a198e892a7ca0d8f4a3bcd5f1 (diff)
downloadmariadb-git-82eedf4e9745aa570b88b225941120475f9f74c9.tar.gz
Makeing rpl_filter for each Master_info.
Users can set different repplication filter rules for each replication connection, in my.cnf or command line. But the rules set online will not record in master.info, it means if users restart MySQL, these rules will lose. So if users wantn't their replication filter rules lose, they should write the rules in my.cnf. Users can set rules by 2 ways: 1. Online SET command, "SET connection_name.replication_filter_settings = rules;". 2. In my.cnf, "connection_name.replication_filter_settings = rules". If no connection_name in my.cnf, this rule will apply for ALL replication connection. If no connetion_name in SET statement, this rull will apply for default_connection_name.
-rw-r--r--libmysqld/lib_sql.cc2
-rw-r--r--sql/keycaches.cc66
-rw-r--r--sql/keycaches.h12
-rw-r--r--sql/log_event.cc11
-rw-r--r--sql/mysqld.cc60
-rw-r--r--sql/rpl_filter.cc12
-rw-r--r--sql/rpl_filter.h3
-rw-r--r--sql/rpl_mi.cc68
-rw-r--r--sql/rpl_mi.h4
-rw-r--r--sql/rpl_rli.cc2
-rw-r--r--sql/slave.cc6
-rw-r--r--sql/sql_acl.cc5
-rw-r--r--sql/sql_class.h4
-rw-r--r--sql/sql_parse.cc11
-rw-r--r--sql/sys_vars.cc65
-rw-r--r--sql/sys_vars.h5
16 files changed, 300 insertions, 36 deletions
diff --git a/libmysqld/lib_sql.cc b/libmysqld/lib_sql.cc
index 5e798674c2d..16c6cc94f1b 100644
--- a/libmysqld/lib_sql.cc
+++ b/libmysqld/lib_sql.cc
@@ -604,7 +604,7 @@ int init_embedded_server(int argc, char **argv, char **groups)
// FIXME initialize binlog_filter and rpl_filter if not already done
// corresponding delete is in clean_up()
if(!binlog_filter) binlog_filter = new Rpl_filter;
- if(!rpl_filter) rpl_filter = new Rpl_filter;
+ if(!global_rpl_filter) global_rpl_filter = new Rpl_filter;
if (opt_init_file)
{
diff --git a/sql/keycaches.cc b/sql/keycaches.cc
index 84ed67d00f0..9e4b943dc83 100644
--- a/sql/keycaches.cc
+++ b/sql/keycaches.cc
@@ -20,6 +20,7 @@
****************************************************************************/
NAMED_ILIST key_caches;
+NAMED_ILIST rpl_filters;
/**
ilink (intrusive list element) with a name
@@ -66,6 +67,23 @@ uchar* find_named(I_List<NAMED_ILINK> *list, const char *name, uint length,
}
+bool NAMED_ILIST::delete_element(const char *name, uint length, void (*free_element)(const char *name, uchar*))
+{
+ I_List_iterator<NAMED_ILINK> it(*this);
+ NAMED_ILINK *element;
+ DBUG_ENTER("NAMED_ILIST::delete_element");
+ while ((element= it++))
+ {
+ if (element->cmp(name, length))
+ {
+ (*free_element)(element->name, element->data);
+ delete element;
+ DBUG_RETURN(0);
+ }
+ }
+ DBUG_RETURN(1);
+}
+
void NAMED_ILIST::delete_elements(void (*free_element)(const char *name, uchar*))
{
NAMED_ILINK *element;
@@ -159,3 +177,51 @@ bool process_key_caches(process_key_cache_t func, void *param)
return res != 0;
}
+/* Rpl_filter functions */
+
+LEX_STRING default_rpl_filter_base= {C_STRING_WITH_LEN("")};
+
+Rpl_filter *get_rpl_filter(LEX_STRING *filter_name)
+{
+ if (!filter_name->length)
+ filter_name= &default_rpl_filter_base;
+ return ((Rpl_filter*) find_named(&rpl_filters,
+ filter_name->str, filter_name->length, 0));
+}
+
+Rpl_filter *create_rpl_filter(const char *name, uint length)
+{
+ Rpl_filter *filter;
+ DBUG_ENTER("create_rpl_filter");
+ DBUG_PRINT("enter",("name: %.*s", length, name));
+
+ filter= new Rpl_filter;
+ if (filter)
+ {
+ if (!new NAMED_ILINK(&rpl_filters, name, length, (uchar*) filter))
+ {
+ delete filter;
+ filter= 0;
+ }
+ }
+ DBUG_RETURN(filter);
+}
+
+
+Rpl_filter *get_or_create_rpl_filter(const char *name, uint length)
+{
+ LEX_STRING rpl_filter_name;
+ Rpl_filter *filter;
+
+ rpl_filter_name.str= (char *) name;
+ rpl_filter_name.length= length;
+ if (!(filter= get_rpl_filter(&rpl_filter_name)))
+ filter= create_rpl_filter(name, length);
+ return filter;
+}
+
+void free_rpl_filter(const char *name, Rpl_filter *filter)
+{
+ delete filter;
+}
+
diff --git a/sql/keycaches.h b/sql/keycaches.h
index 04d3f6145e7..2d52cb28973 100644
--- a/sql/keycaches.h
+++ b/sql/keycaches.h
@@ -18,6 +18,7 @@
#include "sql_list.h"
#include <keycache.h>
+#include <rpl_filter.h>
extern "C"
{
@@ -30,8 +31,10 @@ class NAMED_ILIST: public I_List<NAMED_ILINK>
{
public:
void delete_elements(void (*free_element)(const char*, uchar*));
+ bool delete_element(const char *name, uint length, void (*free_element)(const char*, uchar*));
};
+/* For key cache */
extern LEX_STRING default_key_cache_base;
extern KEY_CACHE zero_key_cache;
extern NAMED_ILIST key_caches;
@@ -42,4 +45,13 @@ KEY_CACHE *get_or_create_key_cache(const char *name, uint length);
void free_key_cache(const char *name, KEY_CACHE *key_cache);
bool process_key_caches(process_key_cache_t func, void *param);
+/* For Rpl_filter */
+extern LEX_STRING default_rpl_filter_base;
+extern NAMED_ILIST rpl_filters;
+
+Rpl_filter *create_rpl_filter(const char *name, uint length);
+Rpl_filter *get_rpl_filter(LEX_STRING *filter_name);
+Rpl_filter *get_or_create_rpl_filter(const char *name, uint length);
+void free_rpl_filter(const char *name, Rpl_filter *filter);
+
#endif /* KEYCACHES_INCLUDED */
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 104ea948cfc..f65d96eb9a8 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -3756,6 +3756,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
HA_CREATE_INFO db_options;
uint64 sub_id= 0;
rpl_gtid gtid;
+ Rpl_filter *rpl_filter= rli->mi->rpl_filter;
DBUG_ENTER("Query_log_event::do_apply_event");
/*
@@ -5439,6 +5440,7 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
bool use_rli_only_for_errors)
{
LEX_STRING new_db;
+ Rpl_filter *rpl_filter= rli->mi->rpl_filter;
DBUG_ENTER("Load_log_event::do_apply_event");
new_db.length= db_len;
@@ -10044,8 +10046,8 @@ check_table_map(Relay_log_info const *rli, RPL_TABLE_LIST *table_list)
enum_tbl_map_status res= OK_TO_PROCESS;
if (rli->sql_thd->slave_thread /* filtering is for slave only */ &&
- (!rpl_filter->db_ok(table_list->db) ||
- (rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list))))
+ (!rli->mi->rpl_filter->db_ok(table_list->db) ||
+ (rli->mi->rpl_filter->is_on() && !rli->mi->rpl_filter->tables_ok("", table_list))))
res= FILTERED_OUT;
else
{
@@ -10079,6 +10081,7 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
char *db_mem, *tname_mem;
size_t dummy_len;
void *memory;
+ Rpl_filter *filter;
DBUG_ENTER("Table_map_log_event::do_apply_event(Relay_log_info*)");
DBUG_ASSERT(rli->sql_thd == thd);
@@ -10092,7 +10095,9 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
NullS)))
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
- strmov(db_mem, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
+ /* call from mysql_client_binlog_statement() will not set rli->mi */
+ filter= rli->sql_thd->slave_thread ? rli->mi->rpl_filter : global_rpl_filter;
+ strmov(db_mem, filter->get_rewrite_db(m_dbnam, &dummy_len));
strmov(tname_mem, m_tblnam);
table_list->init_one_table(db_mem, strlen(db_mem),
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index f794dc6ffa5..2d9923d3ef0 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -621,7 +621,8 @@ MYSQL_FILE *bootstrap_file;
int bootstrap_error;
I_List<THD> threads;
-Rpl_filter* rpl_filter;
+Rpl_filter* cur_rpl_filter;
+Rpl_filter* global_rpl_filter;
Rpl_filter* binlog_filter;
THD *first_global_thread()
@@ -1882,7 +1883,7 @@ void clean_up(bool print_message)
#endif
my_uuid_end();
delete binlog_filter;
- delete rpl_filter;
+ delete global_rpl_filter;
end_ssl();
vio_end();
my_regex_end();
@@ -3587,9 +3588,9 @@ static int init_common_variables()
max_system_variables.pseudo_thread_id= (ulong)~0;
server_start_time= flush_status_time= my_time(0);
- rpl_filter= new Rpl_filter;
+ global_rpl_filter= new Rpl_filter;
binlog_filter= new Rpl_filter;
- if (!rpl_filter || !binlog_filter)
+ if (!global_rpl_filter || !binlog_filter)
{
sql_perror("Could not allocate replication and binlog filters");
return 1;
@@ -5125,6 +5126,9 @@ int mysqld_main(int argc, char **argv)
create_shutdown_thread();
start_handle_manager();
+ /* Copy default global rpl_filter to global_rpl_filter */
+ copy_filter_setting(global_rpl_filter, get_or_create_rpl_filter("", 0));
+
/*
init_slave() must be called after the thread keys are created.
Some parts of the code (e.g. SHOW STATUS LIKE 'slave_running' and other
@@ -6541,28 +6545,28 @@ struct my_option my_long_options[]=
"while having selected a different or no database. If you need cross "
"database updates to work, make sure you have 3.23.28 or later, and use "
"replicate-wild-do-table=db_name.%.",
- 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"replicate-do-table", OPT_REPLICATE_DO_TABLE,
"Tells the slave thread to restrict replication to the specified table. "
"To specify more than one table, use the directive multiple times, once "
"for each table. This will work for cross-database updates, in contrast "
- "to replicate-do-db.", 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ "to replicate-do-db.", 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"replicate-ignore-db", OPT_REPLICATE_IGNORE_DB,
"Tells the slave thread to not replicate to the specified database. To "
"specify more than one database to ignore, use the directive multiple "
"times, once for each database. This option will not work if you use "
"cross database updates. If you need cross database updates to work, "
"make sure you have 3.23.28 or later, and use replicate-wild-ignore-"
- "table=db_name.%. ", 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ "table=db_name.%. ", 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"replicate-ignore-table", OPT_REPLICATE_IGNORE_TABLE,
"Tells the slave thread to not replicate to the specified table. To specify "
"more than one table to ignore, use the directive multiple times, once for "
"each table. This will work for cross-database updates, in contrast to "
- "replicate-ignore-db.", 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ "replicate-ignore-db.", 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"replicate-rewrite-db", OPT_REPLICATE_REWRITE_DB,
"Updates to a database with a different name than the original. Example: "
"replicate-rewrite-db=master_db_name->slave_db_name.",
- 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#ifdef HAVE_REPLICATION
{"replicate-same-server-id", 0,
"In replication, if set to 1, do not skip events having our server id. "
@@ -6578,7 +6582,7 @@ struct my_option my_long_options[]=
"database updates. Example: replicate-wild-do-table=foo%.bar% will "
"replicate only updates to tables in all databases that start with foo "
"and whose table names start with bar.",
- 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"replicate-wild-ignore-table", OPT_REPLICATE_WILD_IGNORE_TABLE,
"Tells the slave thread to not replicate to the tables that match the "
"given wildcard pattern. To specify more than one table to ignore, use "
@@ -6586,7 +6590,7 @@ struct my_option my_long_options[]=
"cross-database updates. Example: replicate-wild-ignore-table=foo%.bar% "
"will not do updates to tables in databases that start with foo and whose "
"table names start with bar.",
- 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"safe-mode", OPT_SAFE, "Skip some optimize stages (for testing). Deprecated.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
{"safe-user-create", 0,
@@ -7752,12 +7756,12 @@ mysqld_get_one_option(int optid,
#ifdef HAVE_REPLICATION
case (int)OPT_REPLICATE_IGNORE_DB:
{
- rpl_filter->add_ignore_db(argument);
+ cur_rpl_filter->add_ignore_db(argument);
break;
}
case (int)OPT_REPLICATE_DO_DB:
{
- rpl_filter->add_do_db(argument);
+ cur_rpl_filter->add_do_db(argument);
break;
}
case (int)OPT_REPLICATE_REWRITE_DB:
@@ -7788,7 +7792,7 @@ mysqld_get_one_option(int optid,
return 1;
}
- rpl_filter->add_db_rewrite(key, val);
+ cur_rpl_filter->add_db_rewrite(key, val);
break;
}
@@ -7804,7 +7808,7 @@ mysqld_get_one_option(int optid,
}
case (int)OPT_REPLICATE_DO_TABLE:
{
- if (rpl_filter->add_do_table(argument))
+ if (cur_rpl_filter->add_do_table(argument))
{
sql_print_error("Could not add do table rule '%s'!\n", argument);
return 1;
@@ -7813,7 +7817,7 @@ mysqld_get_one_option(int optid,
}
case (int)OPT_REPLICATE_WILD_DO_TABLE:
{
- if (rpl_filter->add_wild_do_table(argument))
+ if (cur_rpl_filter->add_wild_do_table(argument))
{
sql_print_error("Could not add do table rule '%s'!\n", argument);
return 1;
@@ -7822,7 +7826,7 @@ mysqld_get_one_option(int optid,
}
case (int)OPT_REPLICATE_WILD_IGNORE_TABLE:
{
- if (rpl_filter->add_wild_ignore_table(argument))
+ if (cur_rpl_filter->add_wild_ignore_table(argument))
{
sql_print_error("Could not add ignore table rule '%s'!\n", argument);
return 1;
@@ -7831,7 +7835,7 @@ mysqld_get_one_option(int optid,
}
case (int)OPT_REPLICATE_IGNORE_TABLE:
{
- if (rpl_filter->add_ignore_table(argument))
+ if (cur_rpl_filter->add_ignore_table(argument))
{
sql_print_error("Could not add ignore table rule '%s'!\n", argument);
return 1;
@@ -7952,7 +7956,7 @@ mysqld_get_one_option(int optid,
C_MODE_START
static void*
-mysql_getopt_value(const char *keyname, uint key_length,
+mysql_getopt_value(const char *name, uint length,
const struct my_option *option, int *error)
{
if (error)
@@ -7965,7 +7969,7 @@ mysql_getopt_value(const char *keyname, uint key_length,
case OPT_KEY_CACHE_PARTITIONS:
{
KEY_CACHE *key_cache;
- if (!(key_cache= get_or_create_key_cache(keyname, key_length)))
+ if (!(key_cache= get_or_create_key_cache(name, length)))
{
if (error)
*error= EXIT_OUT_OF_MEMORY;
@@ -7984,6 +7988,22 @@ mysql_getopt_value(const char *keyname, uint key_length,
return (uchar**) &key_cache->param_partitions;
}
}
+ case OPT_REPLICATE_DO_DB:
+ case OPT_REPLICATE_DO_TABLE:
+ case OPT_REPLICATE_IGNORE_DB:
+ case OPT_REPLICATE_IGNORE_TABLE:
+ case OPT_REPLICATE_WILD_DO_TABLE:
+ case OPT_REPLICATE_WILD_IGNORE_TABLE:
+ case OPT_REPLICATE_REWRITE_DB:
+ {
+ /* Store current filter for mysqld_get_one_option() */
+ if (!(cur_rpl_filter= get_or_create_rpl_filter(name, length)))
+ {
+ if (error)
+ *error= EXIT_OUT_OF_MEMORY;
+ }
+ return 0;
+ }
}
return option->value;
}
diff --git a/sql/rpl_filter.cc b/sql/rpl_filter.cc
index f2bd036896d..2e7a2242d45 100644
--- a/sql/rpl_filter.cc
+++ b/sql/rpl_filter.cc
@@ -734,6 +734,18 @@ Rpl_filter::get_rewrite_db(const char* db, size_t *new_len)
}
+void
+Rpl_filter::copy_rewrite_db(Rpl_filter *from)
+{
+ I_List_iterator<i_string_pair> it(from->rewrite_db);
+ i_string_pair* tmp;
+ DBUG_ASSERT(rewrite_db.is_empty());
+
+ /* TODO: Add memory checking here and in all add_xxxx functions ! */
+ while ((tmp=it++))
+ add_db_rewrite(tmp->key, tmp->val);
+}
+
I_List<i_string>*
Rpl_filter::get_do_db()
{
diff --git a/sql/rpl_filter.h b/sql/rpl_filter.h
index 2eb0340b714..65d11cfb6e6 100644
--- a/sql/rpl_filter.h
+++ b/sql/rpl_filter.h
@@ -88,6 +88,7 @@ public:
bool rewrite_db_is_empty();
const char* get_rewrite_db(const char* db, size_t *new_len);
+ void copy_rewrite_db(Rpl_filter *from);
I_List<i_string>* get_do_db();
I_List<i_string>* get_ignore_db();
@@ -139,7 +140,7 @@ private:
I_List<i_string_pair> rewrite_db;
};
-extern Rpl_filter *rpl_filter;
+extern Rpl_filter *global_rpl_filter;
extern Rpl_filter *binlog_filter;
#endif // RPL_FILTER_H
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index fdff61fafec..38fcc54e891 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -60,6 +60,13 @@ Master_info::Master_info(LEX_STRING *connection_name_arg,
connection_name.length+1);
my_casedn_str(system_charset_info, cmp_connection_name.str);
}
+ /* When MySQL restarted, all Rpl_filter settings which aren't in the my.cnf
+ * will lose. So if you want a setting will not lose after restarting, you
+ * should add them into my.cnf
+ * */
+ rpl_filter= get_or_create_rpl_filter(connection_name.str,
+ connection_name.length);
+ copy_filter_setting(rpl_filter, global_rpl_filter);
my_init_dynamic_array(&ignore_server_ids,
sizeof(global_system_variables.server_id), 16, 16,
@@ -78,6 +85,8 @@ Master_info::Master_info(LEX_STRING *connection_name_arg,
Master_info::~Master_info()
{
+ rpl_filters.delete_element(connection_name.str, connection_name.length,
+ (void (*)(const char*, uchar*)) free_rpl_filter);
my_free(connection_name.str);
delete_dynamic(&ignore_server_ids);
mysql_mutex_destroy(&run_lock);
@@ -711,6 +720,65 @@ void create_logfile_name_with_suffix(char *res_file_name, size_t length,
}
}
+void copy_filter_setting(Rpl_filter* dst_filter, Rpl_filter* src_filter)
+{
+ char buf[256];
+ String tmp(buf, sizeof(buf), &my_charset_bin);
+
+ dst_filter->get_do_db(&tmp);
+ if (tmp.is_empty())
+ {
+ src_filter->get_do_db(&tmp);
+ if (!tmp.is_empty())
+ dst_filter->set_do_db(tmp.ptr());
+ }
+
+ dst_filter->get_do_table(&tmp);
+ if (tmp.is_empty())
+ {
+ src_filter->get_do_table(&tmp);
+ if (!tmp.is_empty())
+ dst_filter->set_do_table(tmp.ptr());
+ }
+
+ dst_filter->get_ignore_db(&tmp);
+ if (tmp.is_empty())
+ {
+ src_filter->get_ignore_db(&tmp);
+ if (!tmp.is_empty())
+ dst_filter->set_ignore_db(tmp.ptr());
+ }
+
+ dst_filter->get_ignore_table(&tmp);
+ if (tmp.is_empty())
+ {
+ src_filter->get_ignore_table(&tmp);
+ if (!tmp.is_empty())
+ dst_filter->set_ignore_table(tmp.ptr());
+ }
+
+ dst_filter->get_wild_do_table(&tmp);
+ if (tmp.is_empty())
+ {
+ src_filter->get_wild_do_table(&tmp);
+ if (!tmp.is_empty())
+ dst_filter->set_wild_do_table(tmp.ptr());
+ }
+
+ dst_filter->get_wild_ignore_table(&tmp);
+ if (tmp.is_empty())
+ {
+ src_filter->get_wild_ignore_table(&tmp);
+ if (!tmp.is_empty())
+ dst_filter->set_wild_ignore_table(tmp.ptr());
+ }
+
+ if (dst_filter->rewrite_db_is_empty())
+ {
+ if (!src_filter->rewrite_db_is_empty())
+ dst_filter->copy_rewrite_db(src_filter);
+ }
+}
Master_info_index::Master_info_index()
{
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index b6a3e7d91b9..64501e96f00 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -21,6 +21,8 @@
#include "rpl_rli.h"
#include "rpl_reporting.h"
#include "my_sys.h"
+#include "rpl_filter.h"
+#include "keycaches.h"
typedef struct st_mysql MYSQL;
@@ -92,6 +94,7 @@ class Master_info : public Slave_reporting_capability
uint32 file_id; /* for 3.23 load data infile */
Relay_log_info rli;
uint port;
+ Rpl_filter* rpl_filter; /* Each replication can set its filter rule*/
/*
to hold checksum alg in use until IO thread has received FD.
Initialized to novalue, then set to the queried from master
@@ -141,6 +144,7 @@ int flush_master_info(Master_info* mi,
bool flush_relay_log_cache,
bool need_lock_relay_log);
int change_master_server_id_cmp(ulong *id1, ulong *id2);
+void copy_filter_setting(Rpl_filter* dst_filter, Rpl_filter* src_filter);
/*
Multi master are handled trough this struct.
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 6d53e6c3187..11c6e54c8f8 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -61,7 +61,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
gtid_sub_id(0), tables_to_lock(0), tables_to_lock_count(0),
last_event_start_time(0), deferred_events(NULL),m_flags(0),
row_stmt_start_timestamp(0), long_find_row_note_printed(false),
- m_annotate_event(0)
+ m_annotate_event(0), mi(0)
{
DBUG_ENTER("Relay_log_info::Relay_log_info");
diff --git a/sql/slave.cc b/sql/slave.cc
index 8f1d8669770..2e436d5e8b4 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -2302,6 +2302,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
DBUG_PRINT("info",("host is set: '%s'", mi->host));
String *packet= &thd->packet;
Protocol *protocol= thd->protocol;
+ Rpl_filter *rpl_filter= mi->rpl_filter;
char buf[256];
String tmp(buf, sizeof(buf), &my_charset_bin);
@@ -3787,6 +3788,7 @@ pthread_handler_t handle_slave_sql(void *arg)
thd = new THD; // note that contructor of THD uses DBUG_ !
thd->thread_stack = (char*)&thd; // remember where our stack is
+ thd->rpl_filter = mi->rpl_filter;
DBUG_ASSERT(rli->inited);
DBUG_ASSERT(rli->mi == mi);
@@ -3817,7 +3819,7 @@ pthread_handler_t handle_slave_sql(void *arg)
}
thd->init_for_queries();
thd->rli_slave= rli;
- if ((rli->deferred_events_collecting= rpl_filter->is_on()))
+ if ((rli->deferred_events_collecting= mi->rpl_filter->is_on()))
{
rli->deferred_events= new Deferred_log_events(rli);
}
@@ -4145,7 +4147,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
if (unlikely(!cev->is_valid()))
DBUG_RETURN(1);
- if (!rpl_filter->db_ok(cev->db))
+ if (!mi->rpl_filter->db_ok(cev->db))
{
skip_load_data_infile(net);
DBUG_RETURN(0);
diff --git a/sql/sql_acl.cc b/sql/sql_acl.cc
index d9470094c63..7810bdb16e2 100644
--- a/sql/sql_acl.cc
+++ b/sql/sql_acl.cc
@@ -1905,6 +1905,7 @@ bool change_password(THD *thd, const char *host, const char *user,
{
TABLE_LIST tables;
TABLE *table;
+ Rpl_filter *rpl_filter= thd->rpl_filter;
/* Buffer should be extended when password length is extended. */
char buff[512];
ulong query_length;
@@ -3593,6 +3594,7 @@ int mysql_table_grant(THD *thd, TABLE_LIST *table_list,
TABLE_LIST tables[3];
bool create_new_users=0;
char *db_name, *table_name;
+ Rpl_filter *rpl_filter= thd->rpl_filter;
DBUG_ENTER("mysql_table_grant");
if (!initialized)
@@ -3869,6 +3871,7 @@ bool mysql_routine_grant(THD *thd, TABLE_LIST *table_list, bool is_proc,
TABLE_LIST tables[2];
bool create_new_users=0, result=0;
char *db_name, *table_name;
+ Rpl_filter *rpl_filter= thd->rpl_filter;
DBUG_ENTER("mysql_routine_grant");
if (!initialized)
@@ -4009,6 +4012,7 @@ bool mysql_grant(THD *thd, const char *db, List <LEX_USER> &list,
char tmp_db[SAFE_NAME_LEN+1];
bool create_new_users=0;
TABLE_LIST tables[2];
+ Rpl_filter *rpl_filter= thd->rpl_filter;
DBUG_ENTER("mysql_grant");
if (!initialized)
@@ -5758,6 +5762,7 @@ void get_mqh(const char *user, const char *host, USER_CONN *uc)
#define GRANT_TABLES 6
int open_grant_tables(THD *thd, TABLE_LIST *tables)
{
+ Rpl_filter *rpl_filter= thd->rpl_filter;
DBUG_ENTER("open_grant_tables");
if (!initialized)
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 221c0d3bd51..8e2bd59da57 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -47,6 +47,7 @@
class Reprepare_observer;
class Relay_log_info;
+class Rpl_filter;
class Query_log_event;
class Load_log_event;
@@ -1588,6 +1589,9 @@ public:
/* Slave applier execution context */
Relay_log_info* rli_slave;
+ /* Used to SLAVE SQL thread */
+ Rpl_filter* rpl_filter;
+
void reset_for_next_command(bool calculate_userstat);
/*
Constant for THD::where initialization in the beginning of every query.
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index f3cc03e9cc6..c6372099600 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -170,8 +170,8 @@ const char *xa_state_names[]={
*/
inline bool all_tables_not_ok(THD *thd, TABLE_LIST *tables)
{
- return rpl_filter->is_on() && tables && !thd->spcont &&
- !rpl_filter->tables_ok(thd->db, tables);
+ return thd->rpl_filter->is_on() && tables && !thd->spcont &&
+ !thd->rpl_filter->tables_ok(thd->db, tables);
}
#endif
@@ -1954,6 +1954,8 @@ mysql_execute_command(THD *thd)
#ifdef HAVE_REPLICATION
/* have table map for update for multi-update statement (BUG#37051) */
bool have_table_map_for_update= FALSE;
+ /* */
+ Rpl_filter *rpl_filter= thd->rpl_filter;
#endif
DBUG_ENTER("mysql_execute_command");
#ifdef WITH_PARTITION_STORAGE_ENGINE
@@ -2438,6 +2440,11 @@ case SQLCOM_PREPARE:
else
delete mi;
}
+ else
+ {
+ mi->rpl_filter= get_or_create_rpl_filter(lex_mi->connection_name.str,
+ lex_mi->connection_name.length);
+ }
mysql_mutex_unlock(&LOCK_active_mi);
break;
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 762b35da89a..4393809a6fe 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -3371,19 +3371,48 @@ static Sys_var_mybool Sys_relay_log_recovery(
bool Sys_var_rpl_filter::global_update(THD *thd, set_var *var)
{
bool result= true; // Assume error
+ Master_info *mi;
mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi);
- if (!master_info_index->give_error_if_slave_running())
- result= set_filter_value(var->save_result.string_value.str);
+
+ if (!var->base.length) // no base name
+ {
+ mi= master_info_index->
+ get_master_info(&thd->variables.default_master_connection,
+ MYSQL_ERROR::WARN_LEVEL_ERROR);
+ }
+ else // has base name
+ {
+ mi= master_info_index->
+ get_master_info(&var->base,
+ MYSQL_ERROR::WARN_LEVEL_WARN);
+ }
+
+ if (mi)
+ {
+ if (mi->rli.slave_running)
+ {
+ my_error(ER_SLAVE_MUST_STOP, MYF(0),
+ mi->connection_name.length,
+ mi->connection_name.str);
+ result= true;
+ }
+ else
+ {
+ result= set_filter_value(var->save_result.string_value.str, mi);
+ }
+ }
+
mysql_mutex_unlock(&LOCK_active_mi);
mysql_mutex_lock(&LOCK_global_system_variables);
return result;
}
-bool Sys_var_rpl_filter::set_filter_value(const char *value)
+bool Sys_var_rpl_filter::set_filter_value(const char *value, Master_info *mi)
{
bool status= true;
+ Rpl_filter* rpl_filter= mi ? mi->rpl_filter : global_rpl_filter;
switch (opt_id) {
case OPT_REPLICATE_DO_DB:
@@ -3413,7 +3442,32 @@ uchar *Sys_var_rpl_filter::global_value_ptr(THD *thd, LEX_STRING *base)
{
char buf[256];
String tmp(buf, sizeof(buf), &my_charset_bin);
+ uchar *ret;
+ Master_info *mi;
+ Rpl_filter *rpl_filter;
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ mysql_mutex_lock(&LOCK_active_mi);
+ if (!base->length) // no base name
+ {
+ mi= master_info_index->
+ get_master_info(&thd->variables.default_master_connection,
+ MYSQL_ERROR::WARN_LEVEL_ERROR);
+ }
+ else // has base name
+ {
+ mi= master_info_index->
+ get_master_info(base,
+ MYSQL_ERROR::WARN_LEVEL_WARN);
+ }
+ mysql_mutex_lock(&LOCK_global_system_variables);
+
+ if (!mi)
+ {
+ mysql_mutex_unlock(&LOCK_active_mi);
+ return 0;
+ }
+ rpl_filter= mi->rpl_filter;
tmp.length(0);
switch (opt_id) {
@@ -3437,7 +3491,10 @@ uchar *Sys_var_rpl_filter::global_value_ptr(THD *thd, LEX_STRING *base)
break;
}
- return (uchar *) thd->strmake(tmp.ptr(), tmp.length());
+ ret= (uchar *) thd->strmake(tmp.ptr(), tmp.length());
+ mysql_mutex_unlock(&LOCK_active_mi);
+
+ return ret;
}
static Sys_var_rpl_filter Sys_replicate_do_db(
diff --git a/sql/sys_vars.h b/sql/sys_vars.h
index b04e3817406..f8b6537453c 100644
--- a/sql/sys_vars.h
+++ b/sql/sys_vars.h
@@ -28,6 +28,7 @@
#include "keycaches.h"
#include "strfunc.h"
#include "tztime.h" // my_tz_find, my_tz_SYSTEM, struct Time_zone
+#include "rpl_mi.h" // For Multi-Source Replication
/*
a set of mostly trivial (as in f(X)=X) defines below to make system variable
@@ -562,7 +563,7 @@ public:
NO_ARG, SHOW_CHAR, 0, NULL, VARIABLE_NOT_IN_BINLOG,
NULL, NULL, NULL), opt_id(getopt_id)
{
- option.var_type= GET_STR;
+ option.var_type= GET_STR | GET_ASK_ADDR;
}
bool do_check(THD *thd, set_var *var)
@@ -588,7 +589,7 @@ public:
protected:
uchar *global_value_ptr(THD *thd, LEX_STRING *base);
- bool set_filter_value(const char *value);
+ bool set_filter_value(const char *value, Master_info *mi);
};
/**