summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libmysqld/lib_sql.cc2
-rw-r--r--sql/keycaches.cc66
-rw-r--r--sql/keycaches.h12
-rw-r--r--sql/log_event.cc11
-rw-r--r--sql/mysqld.cc60
-rw-r--r--sql/rpl_filter.cc12
-rw-r--r--sql/rpl_filter.h3
-rw-r--r--sql/rpl_mi.cc68
-rw-r--r--sql/rpl_mi.h4
-rw-r--r--sql/rpl_rli.cc2
-rw-r--r--sql/slave.cc6
-rw-r--r--sql/sql_acl.cc5
-rw-r--r--sql/sql_class.h4
-rw-r--r--sql/sql_parse.cc11
-rw-r--r--sql/sys_vars.cc65
-rw-r--r--sql/sys_vars.h5
16 files changed, 300 insertions, 36 deletions
diff --git a/libmysqld/lib_sql.cc b/libmysqld/lib_sql.cc
index 5e798674c2d..16c6cc94f1b 100644
--- a/libmysqld/lib_sql.cc
+++ b/libmysqld/lib_sql.cc
@@ -604,7 +604,7 @@ int init_embedded_server(int argc, char **argv, char **groups)
// FIXME initialize binlog_filter and rpl_filter if not already done
// corresponding delete is in clean_up()
if(!binlog_filter) binlog_filter = new Rpl_filter;
- if(!rpl_filter) rpl_filter = new Rpl_filter;
+ if(!global_rpl_filter) global_rpl_filter = new Rpl_filter;
if (opt_init_file)
{
diff --git a/sql/keycaches.cc b/sql/keycaches.cc
index 84ed67d00f0..9e4b943dc83 100644
--- a/sql/keycaches.cc
+++ b/sql/keycaches.cc
@@ -20,6 +20,7 @@
****************************************************************************/
NAMED_ILIST key_caches;
+NAMED_ILIST rpl_filters;
/**
ilink (intrusive list element) with a name
@@ -66,6 +67,23 @@ uchar* find_named(I_List<NAMED_ILINK> *list, const char *name, uint length,
}
+bool NAMED_ILIST::delete_element(const char *name, uint length, void (*free_element)(const char *name, uchar*))
+{
+ I_List_iterator<NAMED_ILINK> it(*this);
+ NAMED_ILINK *element;
+ DBUG_ENTER("NAMED_ILIST::delete_element");
+ while ((element= it++))
+ {
+ if (element->cmp(name, length))
+ {
+ (*free_element)(element->name, element->data);
+ delete element;
+ DBUG_RETURN(0);
+ }
+ }
+ DBUG_RETURN(1);
+}
+
void NAMED_ILIST::delete_elements(void (*free_element)(const char *name, uchar*))
{
NAMED_ILINK *element;
@@ -159,3 +177,51 @@ bool process_key_caches(process_key_cache_t func, void *param)
return res != 0;
}
+/* Rpl_filter functions */
+
+LEX_STRING default_rpl_filter_base= {C_STRING_WITH_LEN("")};
+
+Rpl_filter *get_rpl_filter(LEX_STRING *filter_name)
+{
+ if (!filter_name->length)
+ filter_name= &default_rpl_filter_base;
+ return ((Rpl_filter*) find_named(&rpl_filters,
+ filter_name->str, filter_name->length, 0));
+}
+
+Rpl_filter *create_rpl_filter(const char *name, uint length)
+{
+ Rpl_filter *filter;
+ DBUG_ENTER("create_rpl_filter");
+ DBUG_PRINT("enter",("name: %.*s", length, name));
+
+ filter= new Rpl_filter;
+ if (filter)
+ {
+ if (!new NAMED_ILINK(&rpl_filters, name, length, (uchar*) filter))
+ {
+ delete filter;
+ filter= 0;
+ }
+ }
+ DBUG_RETURN(filter);
+}
+
+
+Rpl_filter *get_or_create_rpl_filter(const char *name, uint length)
+{
+ LEX_STRING rpl_filter_name;
+ Rpl_filter *filter;
+
+ rpl_filter_name.str= (char *) name;
+ rpl_filter_name.length= length;
+ if (!(filter= get_rpl_filter(&rpl_filter_name)))
+ filter= create_rpl_filter(name, length);
+ return filter;
+}
+
+void free_rpl_filter(const char *name, Rpl_filter *filter)
+{
+ delete filter;
+}
+
diff --git a/sql/keycaches.h b/sql/keycaches.h
index 04d3f6145e7..2d52cb28973 100644
--- a/sql/keycaches.h
+++ b/sql/keycaches.h
@@ -18,6 +18,7 @@
#include "sql_list.h"
#include <keycache.h>
+#include <rpl_filter.h>
extern "C"
{
@@ -30,8 +31,10 @@ class NAMED_ILIST: public I_List<NAMED_ILINK>
{
public:
void delete_elements(void (*free_element)(const char*, uchar*));
+ bool delete_element(const char *name, uint length, void (*free_element)(const char*, uchar*));
};
+/* For key cache */
extern LEX_STRING default_key_cache_base;
extern KEY_CACHE zero_key_cache;
extern NAMED_ILIST key_caches;
@@ -42,4 +45,13 @@ KEY_CACHE *get_or_create_key_cache(const char *name, uint length);
void free_key_cache(const char *name, KEY_CACHE *key_cache);
bool process_key_caches(process_key_cache_t func, void *param);
+/* For Rpl_filter */
+extern LEX_STRING default_rpl_filter_base;
+extern NAMED_ILIST rpl_filters;
+
+Rpl_filter *create_rpl_filter(const char *name, uint length);
+Rpl_filter *get_rpl_filter(LEX_STRING *filter_name);
+Rpl_filter *get_or_create_rpl_filter(const char *name, uint length);
+void free_rpl_filter(const char *name, Rpl_filter *filter);
+
#endif /* KEYCACHES_INCLUDED */
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 104ea948cfc..f65d96eb9a8 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -3756,6 +3756,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
HA_CREATE_INFO db_options;
uint64 sub_id= 0;
rpl_gtid gtid;
+ Rpl_filter *rpl_filter= rli->mi->rpl_filter;
DBUG_ENTER("Query_log_event::do_apply_event");
/*
@@ -5439,6 +5440,7 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
bool use_rli_only_for_errors)
{
LEX_STRING new_db;
+ Rpl_filter *rpl_filter= rli->mi->rpl_filter;
DBUG_ENTER("Load_log_event::do_apply_event");
new_db.length= db_len;
@@ -10044,8 +10046,8 @@ check_table_map(Relay_log_info const *rli, RPL_TABLE_LIST *table_list)
enum_tbl_map_status res= OK_TO_PROCESS;
if (rli->sql_thd->slave_thread /* filtering is for slave only */ &&
- (!rpl_filter->db_ok(table_list->db) ||
- (rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list))))
+ (!rli->mi->rpl_filter->db_ok(table_list->db) ||
+ (rli->mi->rpl_filter->is_on() && !rli->mi->rpl_filter->tables_ok("", table_list))))
res= FILTERED_OUT;
else
{
@@ -10079,6 +10081,7 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
char *db_mem, *tname_mem;
size_t dummy_len;
void *memory;
+ Rpl_filter *filter;
DBUG_ENTER("Table_map_log_event::do_apply_event(Relay_log_info*)");
DBUG_ASSERT(rli->sql_thd == thd);
@@ -10092,7 +10095,9 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
NullS)))
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
- strmov(db_mem, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
+ /* call from mysql_client_binlog_statement() will not set rli->mi */
+ filter= rli->sql_thd->slave_thread ? rli->mi->rpl_filter : global_rpl_filter;
+ strmov(db_mem, filter->get_rewrite_db(m_dbnam, &dummy_len));
strmov(tname_mem, m_tblnam);
table_list->init_one_table(db_mem, strlen(db_mem),
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index f794dc6ffa5..2d9923d3ef0 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -621,7 +621,8 @@ MYSQL_FILE *bootstrap_file;
int bootstrap_error;
I_List<THD> threads;
-Rpl_filter* rpl_filter;
+Rpl_filter* cur_rpl_filter;
+Rpl_filter* global_rpl_filter;
Rpl_filter* binlog_filter;
THD *first_global_thread()
@@ -1882,7 +1883,7 @@ void clean_up(bool print_message)
#endif
my_uuid_end();
delete binlog_filter;
- delete rpl_filter;
+ delete global_rpl_filter;
end_ssl();
vio_end();
my_regex_end();
@@ -3587,9 +3588,9 @@ static int init_common_variables()
max_system_variables.pseudo_thread_id= (ulong)~0;
server_start_time= flush_status_time= my_time(0);
- rpl_filter= new Rpl_filter;
+ global_rpl_filter= new Rpl_filter;
binlog_filter= new Rpl_filter;
- if (!rpl_filter || !binlog_filter)
+ if (!global_rpl_filter || !binlog_filter)
{
sql_perror("Could not allocate replication and binlog filters");
return 1;
@@ -5125,6 +5126,9 @@ int mysqld_main(int argc, char **argv)
create_shutdown_thread();
start_handle_manager();
+ /* Copy default global rpl_filter to global_rpl_filter */
+ copy_filter_setting(global_rpl_filter, get_or_create_rpl_filter("", 0));
+
/*
init_slave() must be called after the thread keys are created.
Some parts of the code (e.g. SHOW STATUS LIKE 'slave_running' and other
@@ -6541,28 +6545,28 @@ struct my_option my_long_options[]=
"while having selected a different or no database. If you need cross "
"database updates to work, make sure you have 3.23.28 or later, and use "
"replicate-wild-do-table=db_name.%.",
- 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"replicate-do-table", OPT_REPLICATE_DO_TABLE,
"Tells the slave thread to restrict replication to the specified table. "
"To specify more than one table, use the directive multiple times, once "
"for each table. This will work for cross-database updates, in contrast "
- "to replicate-do-db.", 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ "to replicate-do-db.", 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"replicate-ignore-db", OPT_REPLICATE_IGNORE_DB,
"Tells the slave thread to not replicate to the specified database. To "
"specify more than one database to ignore, use the directive multiple "
"times, once for each database. This option will not work if you use "
"cross database updates. If you need cross database updates to work, "
"make sure you have 3.23.28 or later, and use replicate-wild-ignore-"
- "table=db_name.%. ", 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ "table=db_name.%. ", 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"replicate-ignore-table", OPT_REPLICATE_IGNORE_TABLE,
"Tells the slave thread to not replicate to the specified table. To specify "
"more than one table to ignore, use the directive multiple times, once for "
"each table. This will work for cross-database updates, in contrast to "
- "replicate-ignore-db.", 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ "replicate-ignore-db.", 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"replicate-rewrite-db", OPT_REPLICATE_REWRITE_DB,
"Updates to a database with a different name than the original. Example: "
"replicate-rewrite-db=master_db_name->slave_db_name.",
- 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#ifdef HAVE_REPLICATION
{"replicate-same-server-id", 0,
"In replication, if set to 1, do not skip events having our server id. "
@@ -6578,7 +6582,7 @@ struct my_option my_long_options[]=
"database updates. Example: replicate-wild-do-table=foo%.bar% will "
"replicate only updates to tables in all databases that start with foo "
"and whose table names start with bar.",
- 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"replicate-wild-ignore-table", OPT_REPLICATE_WILD_IGNORE_TABLE,
"Tells the slave thread to not replicate to the tables that match the "
"given wildcard pattern. To specify more than one table to ignore, use "
@@ -6586,7 +6590,7 @@ struct my_option my_long_options[]=
"cross-database updates. Example: replicate-wild-ignore-table=foo%.bar% "
"will not do updates to tables in databases that start with foo and whose "
"table names start with bar.",
- 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"safe-mode", OPT_SAFE, "Skip some optimize stages (for testing). Deprecated.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
{"safe-user-create", 0,
@@ -7752,12 +7756,12 @@ mysqld_get_one_option(int optid,
#ifdef HAVE_REPLICATION
case (int)OPT_REPLICATE_IGNORE_DB:
{
- rpl_filter->add_ignore_db(argument);
+ cur_rpl_filter->add_ignore_db(argument);
break;
}
case (int)OPT_REPLICATE_DO_DB:
{
- rpl_filter->add_do_db(argument);
+ cur_rpl_filter->add_do_db(argument);
break;
}
case (int)OPT_REPLICATE_REWRITE_DB:
@@ -7788,7 +7792,7 @@ mysqld_get_one_option(int optid,
return 1;
}
- rpl_filter->add_db_rewrite(key, val);
+ cur_rpl_filter->add_db_rewrite(key, val);
break;
}
@@ -7804,7 +7808,7 @@ mysqld_get_one_option(int optid,
}
case (int)OPT_REPLICATE_DO_TABLE:
{
- if (rpl_filter->add_do_table(argument))
+ if (cur_rpl_filter->add_do_table(argument))
{
sql_print_error("Could not add do table rule '%s'!\n", argument);
return 1;
@@ -7813,7 +7817,7 @@ mysqld_get_one_option(int optid,
}
case (int)OPT_REPLICATE_WILD_DO_TABLE:
{
- if (rpl_filter->add_wild_do_table(argument))
+ if (cur_rpl_filter->add_wild_do_table(argument))
{
sql_print_error("Could not add do table rule '%s'!\n", argument);
return 1;
@@ -7822,7 +7826,7 @@ mysqld_get_one_option(int optid,
}
case (int)OPT_REPLICATE_WILD_IGNORE_TABLE:
{
- if (rpl_filter->add_wild_ignore_table(argument))
+ if (cur_rpl_filter->add_wild_ignore_table(argument))
{
sql_print_error("Could not add ignore table rule '%s'!\n", argument);
return 1;
@@ -7831,7 +7835,7 @@ mysqld_get_one_option(int optid,
}
case (int)OPT_REPLICATE_IGNORE_TABLE:
{
- if (rpl_filter->add_ignore_table(argument))
+ if (cur_rpl_filter->add_ignore_table(argument))
{
sql_print_error("Could not add ignore table rule '%s'!\n", argument);
return 1;
@@ -7952,7 +7956,7 @@ mysqld_get_one_option(int optid,
C_MODE_START
static void*
-mysql_getopt_value(const char *keyname, uint key_length,
+mysql_getopt_value(const char *name, uint length,
const struct my_option *option, int *error)
{
if (error)
@@ -7965,7 +7969,7 @@ mysql_getopt_value(const char *keyname, uint key_length,
case OPT_KEY_CACHE_PARTITIONS:
{
KEY_CACHE *key_cache;
- if (!(key_cache= get_or_create_key_cache(keyname, key_length)))
+ if (!(key_cache= get_or_create_key_cache(name, length)))
{
if (error)
*error= EXIT_OUT_OF_MEMORY;
@@ -7984,6 +7988,22 @@ mysql_getopt_value(const char *keyname, uint key_length,
return (uchar**) &key_cache->param_partitions;
}
}
+ case OPT_REPLICATE_DO_DB:
+ case OPT_REPLICATE_DO_TABLE:
+ case OPT_REPLICATE_IGNORE_DB:
+ case OPT_REPLICATE_IGNORE_TABLE:
+ case OPT_REPLICATE_WILD_DO_TABLE:
+ case OPT_REPLICATE_WILD_IGNORE_TABLE:
+ case OPT_REPLICATE_REWRITE_DB:
+ {
+ /* Store current filter for mysqld_get_one_option() */
+ if (!(cur_rpl_filter= get_or_create_rpl_filter(name, length)))
+ {
+ if (error)
+ *error= EXIT_OUT_OF_MEMORY;
+ }
+ return 0;
+ }
}
return option->value;
}
diff --git a/sql/rpl_filter.cc b/sql/rpl_filter.cc
index f2bd036896d..2e7a2242d45 100644
--- a/sql/rpl_filter.cc
+++ b/sql/rpl_filter.cc
@@ -734,6 +734,18 @@ Rpl_filter::get_rewrite_db(const char* db, size_t *new_len)
}
+void
+Rpl_filter::copy_rewrite_db(Rpl_filter *from)
+{
+ I_List_iterator<i_string_pair> it(from->rewrite_db);
+ i_string_pair* tmp;
+ DBUG_ASSERT(rewrite_db.is_empty());
+
+ /* TODO: Add memory checking here and in all add_xxxx functions ! */
+ while ((tmp=it++))
+ add_db_rewrite(tmp->key, tmp->val);
+}
+
I_List<i_string>*
Rpl_filter::get_do_db()
{
diff --git a/sql/rpl_filter.h b/sql/rpl_filter.h
index 2eb0340b714..65d11cfb6e6 100644
--- a/sql/rpl_filter.h
+++ b/sql/rpl_filter.h
@@ -88,6 +88,7 @@ public:
bool rewrite_db_is_empty();
const char* get_rewrite_db(const char* db, size_t *new_len);
+ void copy_rewrite_db(Rpl_filter *from);
I_List<i_string>* get_do_db();
I_List<i_string>* get_ignore_db();
@@ -139,7 +140,7 @@ private:
I_List<i_string_pair> rewrite_db;
};
-extern Rpl_filter *rpl_filter;
+extern Rpl_filter *global_rpl_filter;
extern Rpl_filter *binlog_filter;
#endif // RPL_FILTER_H
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index fdff61fafec..38fcc54e891 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -60,6 +60,13 @@ Master_info::Master_info(LEX_STRING *connection_name_arg,
connection_name.length+1);
my_casedn_str(system_charset_info, cmp_connection_name.str);
}
+ /* When MySQL restarted, all Rpl_filter settings which aren't in the my.cnf
+ * will lose. So if you want a setting will not lose after restarting, you
+ * should add them into my.cnf
+ * */
+ rpl_filter= get_or_create_rpl_filter(connection_name.str,
+ connection_name.length);
+ copy_filter_setting(rpl_filter, global_rpl_filter);
my_init_dynamic_array(&ignore_server_ids,
sizeof(global_system_variables.server_id), 16, 16,
@@ -78,6 +85,8 @@ Master_info::Master_info(LEX_STRING *connection_name_arg,
Master_info::~Master_info()
{
+ rpl_filters.delete_element(connection_name.str, connection_name.length,
+ (void (*)(const char*, uchar*)) free_rpl_filter);
my_free(connection_name.str);
delete_dynamic(&ignore_server_ids);
mysql_mutex_destroy(&run_lock);
@@ -711,6 +720,65 @@ void create_logfile_name_with_suffix(char *res_file_name, size_t length,
}
}
+void copy_filter_setting(Rpl_filter* dst_filter, Rpl_filter* src_filter)
+{
+ char buf[256];
+ String tmp(buf, sizeof(buf), &my_charset_bin);
+
+ dst_filter->get_do_db(&tmp);
+ if (tmp.is_empty())
+ {
+ src_filter->get_do_db(&tmp);
+ if (!tmp.is_empty())
+ dst_filter->set_do_db(tmp.ptr());
+ }
+
+ dst_filter->get_do_table(&tmp);
+ if (tmp.is_empty())
+ {
+ src_filter->get_do_table(&tmp);
+ if (!tmp.is_empty())
+ dst_filter->set_do_table(tmp.ptr());
+ }
+
+ dst_filter->get_ignore_db(&tmp);
+ if (tmp.is_empty())
+ {
+ src_filter->get_ignore_db(&tmp);
+ if (!tmp.is_empty())
+ dst_filter->set_ignore_db(tmp.ptr());
+ }
+
+ dst_filter->get_ignore_table(&tmp);
+ if (tmp.is_empty())
+ {
+ src_filter->get_ignore_table(&tmp);
+ if (!tmp.is_empty())
+ dst_filter->set_ignore_table(tmp.ptr());
+ }
+
+ dst_filter->get_wild_do_table(&tmp);
+ if (tmp.is_empty())
+ {
+ src_filter->get_wild_do_table(&tmp);
+ if (!tmp.is_empty())
+ dst_filter->set_wild_do_table(tmp.ptr());
+ }
+
+ dst_filter->get_wild_ignore_table(&tmp);
+ if (tmp.is_empty())
+ {
+ src_filter->get_wild_ignore_table(&tmp);
+ if (!tmp.is_empty())
+ dst_filter->set_wild_ignore_table(tmp.ptr());
+ }
+
+ if (dst_filter->rewrite_db_is_empty())
+ {
+ if (!src_filter->rewrite_db_is_empty())
+ dst_filter->copy_rewrite_db(src_filter);
+ }
+}
Master_info_index::Master_info_index()
{
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index b6a3e7d91b9..64501e96f00 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -21,6 +21,8 @@
#include "rpl_rli.h"
#include "rpl_reporting.h"
#include "my_sys.h"
+#include "rpl_filter.h"
+#include "keycaches.h"
typedef struct st_mysql MYSQL;
@@ -92,6 +94,7 @@ class Master_info : public Slave_reporting_capability
uint32 file_id; /* for 3.23 load data infile */
Relay_log_info rli;
uint port;
+ Rpl_filter* rpl_filter; /* Each replication can set its filter rule*/
/*
to hold checksum alg in use until IO thread has received FD.
Initialized to novalue, then set to the queried from master
@@ -141,6 +144,7 @@ int flush_master_info(Master_info* mi,
bool flush_relay_log_cache,
bool need_lock_relay_log);
int change_master_server_id_cmp(ulong *id1, ulong *id2);
+void copy_filter_setting(Rpl_filter* dst_filter, Rpl_filter* src_filter);
/*
Multi master are handled trough this struct.
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 6d53e6c3187..11c6e54c8f8 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -61,7 +61,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
gtid_sub_id(0), tables_to_lock(0), tables_to_lock_count(0),
last_event_start_time(0), deferred_events(NULL),m_flags(0),
row_stmt_start_timestamp(0), long_find_row_note_printed(false),
- m_annotate_event(0)
+ m_annotate_event(0), mi(0)
{
DBUG_ENTER("Relay_log_info::Relay_log_info");
diff --git a/sql/slave.cc b/sql/slave.cc
index 8f1d8669770..2e436d5e8b4 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -2302,6 +2302,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
DBUG_PRINT("info",("host is set: '%s'", mi->host));
String *packet= &thd->packet;
Protocol *protocol= thd->protocol;
+ Rpl_filter *rpl_filter= mi->rpl_filter;
char buf[256];
String tmp(buf, sizeof(buf), &my_charset_bin);
@@ -3787,6 +3788,7 @@ pthread_handler_t handle_slave_sql(void *arg)
thd = new THD; // note that contructor of THD uses DBUG_ !
thd->thread_stack = (char*)&thd; // remember where our stack is
+ thd->rpl_filter = mi->rpl_filter;
DBUG_ASSERT(rli->inited);
DBUG_ASSERT(rli->mi == mi);
@@ -3817,7 +3819,7 @@ pthread_handler_t handle_slave_sql(void *arg)
}
thd->init_for_queries();
thd->rli_slave= rli;
- if ((rli->deferred_events_collecting= rpl_filter->is_on()))
+ if ((rli->deferred_events_collecting= mi->rpl_filter->is_on()))
{
rli->deferred_events= new Deferred_log_events(rli);
}
@@ -4145,7 +4147,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
if (unlikely(!cev->is_valid()))
DBUG_RETURN(1);
- if (!rpl_filter->db_ok(cev->db))
+ if (!mi->rpl_filter->db_ok(cev->db))
{
skip_load_data_infile(net);
DBUG_RETURN(0);
diff --git a/sql/sql_acl.cc b/sql/sql_acl.cc
index d9470094c63..7810bdb16e2 100644
--- a/sql/sql_acl.cc
+++ b/sql/sql_acl.cc
@@ -1905,6 +1905,7 @@ bool change_password(THD *thd, const char *host, const char *user,
{
TABLE_LIST tables;
TABLE *table;
+ Rpl_filter *rpl_filter= thd->rpl_filter;
/* Buffer should be extended when password length is extended. */
char buff[512];
ulong query_length;
@@ -3593,6 +3594,7 @@ int mysql_table_grant(THD *thd, TABLE_LIST *table_list,
TABLE_LIST tables[3];
bool create_new_users=0;
char *db_name, *table_name;
+ Rpl_filter *rpl_filter= thd->rpl_filter;
DBUG_ENTER("mysql_table_grant");
if (!initialized)
@@ -3869,6 +3871,7 @@ bool mysql_routine_grant(THD *thd, TABLE_LIST *table_list, bool is_proc,
TABLE_LIST tables[2];
bool create_new_users=0, result=0;
char *db_name, *table_name;
+ Rpl_filter *rpl_filter= thd->rpl_filter;
DBUG_ENTER("mysql_routine_grant");
if (!initialized)
@@ -4009,6 +4012,7 @@ bool mysql_grant(THD *thd, const char *db, List <LEX_USER> &list,
char tmp_db[SAFE_NAME_LEN+1];
bool create_new_users=0;
TABLE_LIST tables[2];
+ Rpl_filter *rpl_filter= thd->rpl_filter;
DBUG_ENTER("mysql_grant");
if (!initialized)
@@ -5758,6 +5762,7 @@ void get_mqh(const char *user, const char *host, USER_CONN *uc)
#define GRANT_TABLES 6
int open_grant_tables(THD *thd, TABLE_LIST *tables)
{
+ Rpl_filter *rpl_filter= thd->rpl_filter;
DBUG_ENTER("open_grant_tables");
if (!initialized)
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 221c0d3bd51..8e2bd59da57 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -47,6 +47,7 @@
class Reprepare_observer;
class Relay_log_info;
+class Rpl_filter;
class Query_log_event;
class Load_log_event;
@@ -1588,6 +1589,9 @@ public:
/* Slave applier execution context */
Relay_log_info* rli_slave;
+ /* Used to SLAVE SQL thread */
+ Rpl_filter* rpl_filter;
+
void reset_for_next_command(bool calculate_userstat);
/*
Constant for THD::where initialization in the beginning of every query.
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index f3cc03e9cc6..c6372099600 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -170,8 +170,8 @@ const char *xa_state_names[]={
*/
inline bool all_tables_not_ok(THD *thd, TABLE_LIST *tables)
{
- return rpl_filter->is_on() && tables && !thd->spcont &&
- !rpl_filter->tables_ok(thd->db, tables);
+ return thd->rpl_filter->is_on() && tables && !thd->spcont &&
+ !thd->rpl_filter->tables_ok(thd->db, tables);
}
#endif
@@ -1954,6 +1954,8 @@ mysql_execute_command(THD *thd)
#ifdef HAVE_REPLICATION
/* have table map for update for multi-update statement (BUG#37051) */
bool have_table_map_for_update= FALSE;
+ /* */
+ Rpl_filter *rpl_filter= thd->rpl_filter;
#endif
DBUG_ENTER("mysql_execute_command");
#ifdef WITH_PARTITION_STORAGE_ENGINE
@@ -2438,6 +2440,11 @@ case SQLCOM_PREPARE:
else
delete mi;
}
+ else
+ {
+ mi->rpl_filter= get_or_create_rpl_filter(lex_mi->connection_name.str,
+ lex_mi->connection_name.length);
+ }
mysql_mutex_unlock(&LOCK_active_mi);
break;
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 762b35da89a..4393809a6fe 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -3371,19 +3371,48 @@ static Sys_var_mybool Sys_relay_log_recovery(
bool Sys_var_rpl_filter::global_update(THD *thd, set_var *var)
{
bool result= true; // Assume error
+ Master_info *mi;
mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi);
- if (!master_info_index->give_error_if_slave_running())
- result= set_filter_value(var->save_result.string_value.str);
+
+ if (!var->base.length) // no base name
+ {
+ mi= master_info_index->
+ get_master_info(&thd->variables.default_master_connection,
+ MYSQL_ERROR::WARN_LEVEL_ERROR);
+ }
+ else // has base name
+ {
+ mi= master_info_index->
+ get_master_info(&var->base,
+ MYSQL_ERROR::WARN_LEVEL_WARN);
+ }
+
+ if (mi)
+ {
+ if (mi->rli.slave_running)
+ {
+ my_error(ER_SLAVE_MUST_STOP, MYF(0),
+ mi->connection_name.length,
+ mi->connection_name.str);
+ result= true;
+ }
+ else
+ {
+ result= set_filter_value(var->save_result.string_value.str, mi);
+ }
+ }
+
mysql_mutex_unlock(&LOCK_active_mi);
mysql_mutex_lock(&LOCK_global_system_variables);
return result;
}
-bool Sys_var_rpl_filter::set_filter_value(const char *value)
+bool Sys_var_rpl_filter::set_filter_value(const char *value, Master_info *mi)
{
bool status= true;
+ Rpl_filter* rpl_filter= mi ? mi->rpl_filter : global_rpl_filter;
switch (opt_id) {
case OPT_REPLICATE_DO_DB:
@@ -3413,7 +3442,32 @@ uchar *Sys_var_rpl_filter::global_value_ptr(THD *thd, LEX_STRING *base)
{
char buf[256];
String tmp(buf, sizeof(buf), &my_charset_bin);
+ uchar *ret;
+ Master_info *mi;
+ Rpl_filter *rpl_filter;
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ mysql_mutex_lock(&LOCK_active_mi);
+ if (!base->length) // no base name
+ {
+ mi= master_info_index->
+ get_master_info(&thd->variables.default_master_connection,
+ MYSQL_ERROR::WARN_LEVEL_ERROR);
+ }
+ else // has base name
+ {
+ mi= master_info_index->
+ get_master_info(base,
+ MYSQL_ERROR::WARN_LEVEL_WARN);
+ }
+ mysql_mutex_lock(&LOCK_global_system_variables);
+
+ if (!mi)
+ {
+ mysql_mutex_unlock(&LOCK_active_mi);
+ return 0;
+ }
+ rpl_filter= mi->rpl_filter;
tmp.length(0);
switch (opt_id) {
@@ -3437,7 +3491,10 @@ uchar *Sys_var_rpl_filter::global_value_ptr(THD *thd, LEX_STRING *base)
break;
}
- return (uchar *) thd->strmake(tmp.ptr(), tmp.length());
+ ret= (uchar *) thd->strmake(tmp.ptr(), tmp.length());
+ mysql_mutex_unlock(&LOCK_active_mi);
+
+ return ret;
}
static Sys_var_rpl_filter Sys_replicate_do_db(
diff --git a/sql/sys_vars.h b/sql/sys_vars.h
index b04e3817406..f8b6537453c 100644
--- a/sql/sys_vars.h
+++ b/sql/sys_vars.h
@@ -28,6 +28,7 @@
#include "keycaches.h"
#include "strfunc.h"
#include "tztime.h" // my_tz_find, my_tz_SYSTEM, struct Time_zone
+#include "rpl_mi.h" // For Multi-Source Replication
/*
a set of mostly trivial (as in f(X)=X) defines below to make system variable
@@ -562,7 +563,7 @@ public:
NO_ARG, SHOW_CHAR, 0, NULL, VARIABLE_NOT_IN_BINLOG,
NULL, NULL, NULL), opt_id(getopt_id)
{
- option.var_type= GET_STR;
+ option.var_type= GET_STR | GET_ASK_ADDR;
}
bool do_check(THD *thd, set_var *var)
@@ -588,7 +589,7 @@ public:
protected:
uchar *global_value_ptr(THD *thd, LEX_STRING *base);
- bool set_filter_value(const char *value);
+ bool set_filter_value(const char *value, Master_info *mi);
};
/**