diff options
-rw-r--r-- | libmysqld/lib_sql.cc | 2 | ||||
-rw-r--r-- | sql/keycaches.cc | 66 | ||||
-rw-r--r-- | sql/keycaches.h | 12 | ||||
-rw-r--r-- | sql/log_event.cc | 11 | ||||
-rw-r--r-- | sql/mysqld.cc | 60 | ||||
-rw-r--r-- | sql/rpl_filter.cc | 12 | ||||
-rw-r--r-- | sql/rpl_filter.h | 3 | ||||
-rw-r--r-- | sql/rpl_mi.cc | 68 | ||||
-rw-r--r-- | sql/rpl_mi.h | 4 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 2 | ||||
-rw-r--r-- | sql/slave.cc | 6 | ||||
-rw-r--r-- | sql/sql_acl.cc | 5 | ||||
-rw-r--r-- | sql/sql_class.h | 4 | ||||
-rw-r--r-- | sql/sql_parse.cc | 11 | ||||
-rw-r--r-- | sql/sys_vars.cc | 65 | ||||
-rw-r--r-- | sql/sys_vars.h | 5 |
16 files changed, 300 insertions, 36 deletions
diff --git a/libmysqld/lib_sql.cc b/libmysqld/lib_sql.cc index 5e798674c2d..16c6cc94f1b 100644 --- a/libmysqld/lib_sql.cc +++ b/libmysqld/lib_sql.cc @@ -604,7 +604,7 @@ int init_embedded_server(int argc, char **argv, char **groups) // FIXME initialize binlog_filter and rpl_filter if not already done // corresponding delete is in clean_up() if(!binlog_filter) binlog_filter = new Rpl_filter; - if(!rpl_filter) rpl_filter = new Rpl_filter; + if(!global_rpl_filter) global_rpl_filter = new Rpl_filter; if (opt_init_file) { diff --git a/sql/keycaches.cc b/sql/keycaches.cc index 84ed67d00f0..9e4b943dc83 100644 --- a/sql/keycaches.cc +++ b/sql/keycaches.cc @@ -20,6 +20,7 @@ ****************************************************************************/ NAMED_ILIST key_caches; +NAMED_ILIST rpl_filters; /** ilink (intrusive list element) with a name @@ -66,6 +67,23 @@ uchar* find_named(I_List<NAMED_ILINK> *list, const char *name, uint length, } +bool NAMED_ILIST::delete_element(const char *name, uint length, void (*free_element)(const char *name, uchar*)) +{ + I_List_iterator<NAMED_ILINK> it(*this); + NAMED_ILINK *element; + DBUG_ENTER("NAMED_ILIST::delete_element"); + while ((element= it++)) + { + if (element->cmp(name, length)) + { + (*free_element)(element->name, element->data); + delete element; + DBUG_RETURN(0); + } + } + DBUG_RETURN(1); +} + void NAMED_ILIST::delete_elements(void (*free_element)(const char *name, uchar*)) { NAMED_ILINK *element; @@ -159,3 +177,51 @@ bool process_key_caches(process_key_cache_t func, void *param) return res != 0; } +/* Rpl_filter functions */ + +LEX_STRING default_rpl_filter_base= {C_STRING_WITH_LEN("")}; + +Rpl_filter *get_rpl_filter(LEX_STRING *filter_name) +{ + if (!filter_name->length) + filter_name= &default_rpl_filter_base; + return ((Rpl_filter*) find_named(&rpl_filters, + filter_name->str, filter_name->length, 0)); +} + +Rpl_filter *create_rpl_filter(const char *name, uint length) +{ + Rpl_filter *filter; + DBUG_ENTER("create_rpl_filter"); + DBUG_PRINT("enter",("name: %.*s", length, name)); + + filter= new Rpl_filter; + if (filter) + { + if (!new NAMED_ILINK(&rpl_filters, name, length, (uchar*) filter)) + { + delete filter; + filter= 0; + } + } + DBUG_RETURN(filter); +} + + +Rpl_filter *get_or_create_rpl_filter(const char *name, uint length) +{ + LEX_STRING rpl_filter_name; + Rpl_filter *filter; + + rpl_filter_name.str= (char *) name; + rpl_filter_name.length= length; + if (!(filter= get_rpl_filter(&rpl_filter_name))) + filter= create_rpl_filter(name, length); + return filter; +} + +void free_rpl_filter(const char *name, Rpl_filter *filter) +{ + delete filter; +} + diff --git a/sql/keycaches.h b/sql/keycaches.h index 04d3f6145e7..2d52cb28973 100644 --- a/sql/keycaches.h +++ b/sql/keycaches.h @@ -18,6 +18,7 @@ #include "sql_list.h" #include <keycache.h> +#include <rpl_filter.h> extern "C" { @@ -30,8 +31,10 @@ class NAMED_ILIST: public I_List<NAMED_ILINK> { public: void delete_elements(void (*free_element)(const char*, uchar*)); + bool delete_element(const char *name, uint length, void (*free_element)(const char*, uchar*)); }; +/* For key cache */ extern LEX_STRING default_key_cache_base; extern KEY_CACHE zero_key_cache; extern NAMED_ILIST key_caches; @@ -42,4 +45,13 @@ KEY_CACHE *get_or_create_key_cache(const char *name, uint length); void free_key_cache(const char *name, KEY_CACHE *key_cache); bool process_key_caches(process_key_cache_t func, void *param); +/* For Rpl_filter */ +extern LEX_STRING default_rpl_filter_base; +extern NAMED_ILIST rpl_filters; + +Rpl_filter *create_rpl_filter(const char *name, uint length); +Rpl_filter *get_rpl_filter(LEX_STRING *filter_name); +Rpl_filter *get_or_create_rpl_filter(const char *name, uint length); +void free_rpl_filter(const char *name, Rpl_filter *filter); + #endif /* KEYCACHES_INCLUDED */ diff --git a/sql/log_event.cc b/sql/log_event.cc index 104ea948cfc..f65d96eb9a8 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -3756,6 +3756,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli, HA_CREATE_INFO db_options; uint64 sub_id= 0; rpl_gtid gtid; + Rpl_filter *rpl_filter= rli->mi->rpl_filter; DBUG_ENTER("Query_log_event::do_apply_event"); /* @@ -5439,6 +5440,7 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli, bool use_rli_only_for_errors) { LEX_STRING new_db; + Rpl_filter *rpl_filter= rli->mi->rpl_filter; DBUG_ENTER("Load_log_event::do_apply_event"); new_db.length= db_len; @@ -10044,8 +10046,8 @@ check_table_map(Relay_log_info const *rli, RPL_TABLE_LIST *table_list) enum_tbl_map_status res= OK_TO_PROCESS; if (rli->sql_thd->slave_thread /* filtering is for slave only */ && - (!rpl_filter->db_ok(table_list->db) || - (rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list)))) + (!rli->mi->rpl_filter->db_ok(table_list->db) || + (rli->mi->rpl_filter->is_on() && !rli->mi->rpl_filter->tables_ok("", table_list)))) res= FILTERED_OUT; else { @@ -10079,6 +10081,7 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli) char *db_mem, *tname_mem; size_t dummy_len; void *memory; + Rpl_filter *filter; DBUG_ENTER("Table_map_log_event::do_apply_event(Relay_log_info*)"); DBUG_ASSERT(rli->sql_thd == thd); @@ -10092,7 +10095,9 @@ int Table_map_log_event::do_apply_event(Relay_log_info const *rli) NullS))) DBUG_RETURN(HA_ERR_OUT_OF_MEM); - strmov(db_mem, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len)); + /* call from mysql_client_binlog_statement() will not set rli->mi */ + filter= rli->sql_thd->slave_thread ? rli->mi->rpl_filter : global_rpl_filter; + strmov(db_mem, filter->get_rewrite_db(m_dbnam, &dummy_len)); strmov(tname_mem, m_tblnam); table_list->init_one_table(db_mem, strlen(db_mem), diff --git a/sql/mysqld.cc b/sql/mysqld.cc index f794dc6ffa5..2d9923d3ef0 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -621,7 +621,8 @@ MYSQL_FILE *bootstrap_file; int bootstrap_error; I_List<THD> threads; -Rpl_filter* rpl_filter; +Rpl_filter* cur_rpl_filter; +Rpl_filter* global_rpl_filter; Rpl_filter* binlog_filter; THD *first_global_thread() @@ -1882,7 +1883,7 @@ void clean_up(bool print_message) #endif my_uuid_end(); delete binlog_filter; - delete rpl_filter; + delete global_rpl_filter; end_ssl(); vio_end(); my_regex_end(); @@ -3587,9 +3588,9 @@ static int init_common_variables() max_system_variables.pseudo_thread_id= (ulong)~0; server_start_time= flush_status_time= my_time(0); - rpl_filter= new Rpl_filter; + global_rpl_filter= new Rpl_filter; binlog_filter= new Rpl_filter; - if (!rpl_filter || !binlog_filter) + if (!global_rpl_filter || !binlog_filter) { sql_perror("Could not allocate replication and binlog filters"); return 1; @@ -5125,6 +5126,9 @@ int mysqld_main(int argc, char **argv) create_shutdown_thread(); start_handle_manager(); + /* Copy default global rpl_filter to global_rpl_filter */ + copy_filter_setting(global_rpl_filter, get_or_create_rpl_filter("", 0)); + /* init_slave() must be called after the thread keys are created. Some parts of the code (e.g. SHOW STATUS LIKE 'slave_running' and other @@ -6541,28 +6545,28 @@ struct my_option my_long_options[]= "while having selected a different or no database. If you need cross " "database updates to work, make sure you have 3.23.28 or later, and use " "replicate-wild-do-table=db_name.%.", - 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"replicate-do-table", OPT_REPLICATE_DO_TABLE, "Tells the slave thread to restrict replication to the specified table. " "To specify more than one table, use the directive multiple times, once " "for each table. This will work for cross-database updates, in contrast " - "to replicate-do-db.", 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + "to replicate-do-db.", 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"replicate-ignore-db", OPT_REPLICATE_IGNORE_DB, "Tells the slave thread to not replicate to the specified database. To " "specify more than one database to ignore, use the directive multiple " "times, once for each database. This option will not work if you use " "cross database updates. If you need cross database updates to work, " "make sure you have 3.23.28 or later, and use replicate-wild-ignore-" - "table=db_name.%. ", 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + "table=db_name.%. ", 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"replicate-ignore-table", OPT_REPLICATE_IGNORE_TABLE, "Tells the slave thread to not replicate to the specified table. To specify " "more than one table to ignore, use the directive multiple times, once for " "each table. This will work for cross-database updates, in contrast to " - "replicate-ignore-db.", 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + "replicate-ignore-db.", 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"replicate-rewrite-db", OPT_REPLICATE_REWRITE_DB, "Updates to a database with a different name than the original. Example: " "replicate-rewrite-db=master_db_name->slave_db_name.", - 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, #ifdef HAVE_REPLICATION {"replicate-same-server-id", 0, "In replication, if set to 1, do not skip events having our server id. " @@ -6578,7 +6582,7 @@ struct my_option my_long_options[]= "database updates. Example: replicate-wild-do-table=foo%.bar% will " "replicate only updates to tables in all databases that start with foo " "and whose table names start with bar.", - 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"replicate-wild-ignore-table", OPT_REPLICATE_WILD_IGNORE_TABLE, "Tells the slave thread to not replicate to the tables that match the " "given wildcard pattern. To specify more than one table to ignore, use " @@ -6586,7 +6590,7 @@ struct my_option my_long_options[]= "cross-database updates. Example: replicate-wild-ignore-table=foo%.bar% " "will not do updates to tables in databases that start with foo and whose " "table names start with bar.", - 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + 0, 0, 0, GET_STR | GET_ASK_ADDR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"safe-mode", OPT_SAFE, "Skip some optimize stages (for testing). Deprecated.", 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, {"safe-user-create", 0, @@ -7752,12 +7756,12 @@ mysqld_get_one_option(int optid, #ifdef HAVE_REPLICATION case (int)OPT_REPLICATE_IGNORE_DB: { - rpl_filter->add_ignore_db(argument); + cur_rpl_filter->add_ignore_db(argument); break; } case (int)OPT_REPLICATE_DO_DB: { - rpl_filter->add_do_db(argument); + cur_rpl_filter->add_do_db(argument); break; } case (int)OPT_REPLICATE_REWRITE_DB: @@ -7788,7 +7792,7 @@ mysqld_get_one_option(int optid, return 1; } - rpl_filter->add_db_rewrite(key, val); + cur_rpl_filter->add_db_rewrite(key, val); break; } @@ -7804,7 +7808,7 @@ mysqld_get_one_option(int optid, } case (int)OPT_REPLICATE_DO_TABLE: { - if (rpl_filter->add_do_table(argument)) + if (cur_rpl_filter->add_do_table(argument)) { sql_print_error("Could not add do table rule '%s'!\n", argument); return 1; @@ -7813,7 +7817,7 @@ mysqld_get_one_option(int optid, } case (int)OPT_REPLICATE_WILD_DO_TABLE: { - if (rpl_filter->add_wild_do_table(argument)) + if (cur_rpl_filter->add_wild_do_table(argument)) { sql_print_error("Could not add do table rule '%s'!\n", argument); return 1; @@ -7822,7 +7826,7 @@ mysqld_get_one_option(int optid, } case (int)OPT_REPLICATE_WILD_IGNORE_TABLE: { - if (rpl_filter->add_wild_ignore_table(argument)) + if (cur_rpl_filter->add_wild_ignore_table(argument)) { sql_print_error("Could not add ignore table rule '%s'!\n", argument); return 1; @@ -7831,7 +7835,7 @@ mysqld_get_one_option(int optid, } case (int)OPT_REPLICATE_IGNORE_TABLE: { - if (rpl_filter->add_ignore_table(argument)) + if (cur_rpl_filter->add_ignore_table(argument)) { sql_print_error("Could not add ignore table rule '%s'!\n", argument); return 1; @@ -7952,7 +7956,7 @@ mysqld_get_one_option(int optid, C_MODE_START static void* -mysql_getopt_value(const char *keyname, uint key_length, +mysql_getopt_value(const char *name, uint length, const struct my_option *option, int *error) { if (error) @@ -7965,7 +7969,7 @@ mysql_getopt_value(const char *keyname, uint key_length, case OPT_KEY_CACHE_PARTITIONS: { KEY_CACHE *key_cache; - if (!(key_cache= get_or_create_key_cache(keyname, key_length))) + if (!(key_cache= get_or_create_key_cache(name, length))) { if (error) *error= EXIT_OUT_OF_MEMORY; @@ -7984,6 +7988,22 @@ mysql_getopt_value(const char *keyname, uint key_length, return (uchar**) &key_cache->param_partitions; } } + case OPT_REPLICATE_DO_DB: + case OPT_REPLICATE_DO_TABLE: + case OPT_REPLICATE_IGNORE_DB: + case OPT_REPLICATE_IGNORE_TABLE: + case OPT_REPLICATE_WILD_DO_TABLE: + case OPT_REPLICATE_WILD_IGNORE_TABLE: + case OPT_REPLICATE_REWRITE_DB: + { + /* Store current filter for mysqld_get_one_option() */ + if (!(cur_rpl_filter= get_or_create_rpl_filter(name, length))) + { + if (error) + *error= EXIT_OUT_OF_MEMORY; + } + return 0; + } } return option->value; } diff --git a/sql/rpl_filter.cc b/sql/rpl_filter.cc index f2bd036896d..2e7a2242d45 100644 --- a/sql/rpl_filter.cc +++ b/sql/rpl_filter.cc @@ -734,6 +734,18 @@ Rpl_filter::get_rewrite_db(const char* db, size_t *new_len) } +void +Rpl_filter::copy_rewrite_db(Rpl_filter *from) +{ + I_List_iterator<i_string_pair> it(from->rewrite_db); + i_string_pair* tmp; + DBUG_ASSERT(rewrite_db.is_empty()); + + /* TODO: Add memory checking here and in all add_xxxx functions ! */ + while ((tmp=it++)) + add_db_rewrite(tmp->key, tmp->val); +} + I_List<i_string>* Rpl_filter::get_do_db() { diff --git a/sql/rpl_filter.h b/sql/rpl_filter.h index 2eb0340b714..65d11cfb6e6 100644 --- a/sql/rpl_filter.h +++ b/sql/rpl_filter.h @@ -88,6 +88,7 @@ public: bool rewrite_db_is_empty(); const char* get_rewrite_db(const char* db, size_t *new_len); + void copy_rewrite_db(Rpl_filter *from); I_List<i_string>* get_do_db(); I_List<i_string>* get_ignore_db(); @@ -139,7 +140,7 @@ private: I_List<i_string_pair> rewrite_db; }; -extern Rpl_filter *rpl_filter; +extern Rpl_filter *global_rpl_filter; extern Rpl_filter *binlog_filter; #endif // RPL_FILTER_H diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index fdff61fafec..38fcc54e891 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -60,6 +60,13 @@ Master_info::Master_info(LEX_STRING *connection_name_arg, connection_name.length+1); my_casedn_str(system_charset_info, cmp_connection_name.str); } + /* When MySQL restarted, all Rpl_filter settings which aren't in the my.cnf + * will lose. So if you want a setting will not lose after restarting, you + * should add them into my.cnf + * */ + rpl_filter= get_or_create_rpl_filter(connection_name.str, + connection_name.length); + copy_filter_setting(rpl_filter, global_rpl_filter); my_init_dynamic_array(&ignore_server_ids, sizeof(global_system_variables.server_id), 16, 16, @@ -78,6 +85,8 @@ Master_info::Master_info(LEX_STRING *connection_name_arg, Master_info::~Master_info() { + rpl_filters.delete_element(connection_name.str, connection_name.length, + (void (*)(const char*, uchar*)) free_rpl_filter); my_free(connection_name.str); delete_dynamic(&ignore_server_ids); mysql_mutex_destroy(&run_lock); @@ -711,6 +720,65 @@ void create_logfile_name_with_suffix(char *res_file_name, size_t length, } } +void copy_filter_setting(Rpl_filter* dst_filter, Rpl_filter* src_filter) +{ + char buf[256]; + String tmp(buf, sizeof(buf), &my_charset_bin); + + dst_filter->get_do_db(&tmp); + if (tmp.is_empty()) + { + src_filter->get_do_db(&tmp); + if (!tmp.is_empty()) + dst_filter->set_do_db(tmp.ptr()); + } + + dst_filter->get_do_table(&tmp); + if (tmp.is_empty()) + { + src_filter->get_do_table(&tmp); + if (!tmp.is_empty()) + dst_filter->set_do_table(tmp.ptr()); + } + + dst_filter->get_ignore_db(&tmp); + if (tmp.is_empty()) + { + src_filter->get_ignore_db(&tmp); + if (!tmp.is_empty()) + dst_filter->set_ignore_db(tmp.ptr()); + } + + dst_filter->get_ignore_table(&tmp); + if (tmp.is_empty()) + { + src_filter->get_ignore_table(&tmp); + if (!tmp.is_empty()) + dst_filter->set_ignore_table(tmp.ptr()); + } + + dst_filter->get_wild_do_table(&tmp); + if (tmp.is_empty()) + { + src_filter->get_wild_do_table(&tmp); + if (!tmp.is_empty()) + dst_filter->set_wild_do_table(tmp.ptr()); + } + + dst_filter->get_wild_ignore_table(&tmp); + if (tmp.is_empty()) + { + src_filter->get_wild_ignore_table(&tmp); + if (!tmp.is_empty()) + dst_filter->set_wild_ignore_table(tmp.ptr()); + } + + if (dst_filter->rewrite_db_is_empty()) + { + if (!src_filter->rewrite_db_is_empty()) + dst_filter->copy_rewrite_db(src_filter); + } +} Master_info_index::Master_info_index() { diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index b6a3e7d91b9..64501e96f00 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -21,6 +21,8 @@ #include "rpl_rli.h" #include "rpl_reporting.h" #include "my_sys.h" +#include "rpl_filter.h" +#include "keycaches.h" typedef struct st_mysql MYSQL; @@ -92,6 +94,7 @@ class Master_info : public Slave_reporting_capability uint32 file_id; /* for 3.23 load data infile */ Relay_log_info rli; uint port; + Rpl_filter* rpl_filter; /* Each replication can set its filter rule*/ /* to hold checksum alg in use until IO thread has received FD. Initialized to novalue, then set to the queried from master @@ -141,6 +144,7 @@ int flush_master_info(Master_info* mi, bool flush_relay_log_cache, bool need_lock_relay_log); int change_master_server_id_cmp(ulong *id1, ulong *id2); +void copy_filter_setting(Rpl_filter* dst_filter, Rpl_filter* src_filter); /* Multi master are handled trough this struct. diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 6d53e6c3187..11c6e54c8f8 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -61,7 +61,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) gtid_sub_id(0), tables_to_lock(0), tables_to_lock_count(0), last_event_start_time(0), deferred_events(NULL),m_flags(0), row_stmt_start_timestamp(0), long_find_row_note_printed(false), - m_annotate_event(0) + m_annotate_event(0), mi(0) { DBUG_ENTER("Relay_log_info::Relay_log_info"); diff --git a/sql/slave.cc b/sql/slave.cc index 8f1d8669770..2e436d5e8b4 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -2302,6 +2302,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, DBUG_PRINT("info",("host is set: '%s'", mi->host)); String *packet= &thd->packet; Protocol *protocol= thd->protocol; + Rpl_filter *rpl_filter= mi->rpl_filter; char buf[256]; String tmp(buf, sizeof(buf), &my_charset_bin); @@ -3787,6 +3788,7 @@ pthread_handler_t handle_slave_sql(void *arg) thd = new THD; // note that contructor of THD uses DBUG_ ! thd->thread_stack = (char*)&thd; // remember where our stack is + thd->rpl_filter = mi->rpl_filter; DBUG_ASSERT(rli->inited); DBUG_ASSERT(rli->mi == mi); @@ -3817,7 +3819,7 @@ pthread_handler_t handle_slave_sql(void *arg) } thd->init_for_queries(); thd->rli_slave= rli; - if ((rli->deferred_events_collecting= rpl_filter->is_on())) + if ((rli->deferred_events_collecting= mi->rpl_filter->is_on())) { rli->deferred_events= new Deferred_log_events(rli); } @@ -4145,7 +4147,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev) if (unlikely(!cev->is_valid())) DBUG_RETURN(1); - if (!rpl_filter->db_ok(cev->db)) + if (!mi->rpl_filter->db_ok(cev->db)) { skip_load_data_infile(net); DBUG_RETURN(0); diff --git a/sql/sql_acl.cc b/sql/sql_acl.cc index d9470094c63..7810bdb16e2 100644 --- a/sql/sql_acl.cc +++ b/sql/sql_acl.cc @@ -1905,6 +1905,7 @@ bool change_password(THD *thd, const char *host, const char *user, { TABLE_LIST tables; TABLE *table; + Rpl_filter *rpl_filter= thd->rpl_filter; /* Buffer should be extended when password length is extended. */ char buff[512]; ulong query_length; @@ -3593,6 +3594,7 @@ int mysql_table_grant(THD *thd, TABLE_LIST *table_list, TABLE_LIST tables[3]; bool create_new_users=0; char *db_name, *table_name; + Rpl_filter *rpl_filter= thd->rpl_filter; DBUG_ENTER("mysql_table_grant"); if (!initialized) @@ -3869,6 +3871,7 @@ bool mysql_routine_grant(THD *thd, TABLE_LIST *table_list, bool is_proc, TABLE_LIST tables[2]; bool create_new_users=0, result=0; char *db_name, *table_name; + Rpl_filter *rpl_filter= thd->rpl_filter; DBUG_ENTER("mysql_routine_grant"); if (!initialized) @@ -4009,6 +4012,7 @@ bool mysql_grant(THD *thd, const char *db, List <LEX_USER> &list, char tmp_db[SAFE_NAME_LEN+1]; bool create_new_users=0; TABLE_LIST tables[2]; + Rpl_filter *rpl_filter= thd->rpl_filter; DBUG_ENTER("mysql_grant"); if (!initialized) @@ -5758,6 +5762,7 @@ void get_mqh(const char *user, const char *host, USER_CONN *uc) #define GRANT_TABLES 6 int open_grant_tables(THD *thd, TABLE_LIST *tables) { + Rpl_filter *rpl_filter= thd->rpl_filter; DBUG_ENTER("open_grant_tables"); if (!initialized) diff --git a/sql/sql_class.h b/sql/sql_class.h index 221c0d3bd51..8e2bd59da57 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -47,6 +47,7 @@ class Reprepare_observer; class Relay_log_info; +class Rpl_filter; class Query_log_event; class Load_log_event; @@ -1588,6 +1589,9 @@ public: /* Slave applier execution context */ Relay_log_info* rli_slave; + /* Used to SLAVE SQL thread */ + Rpl_filter* rpl_filter; + void reset_for_next_command(bool calculate_userstat); /* Constant for THD::where initialization in the beginning of every query. diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index f3cc03e9cc6..c6372099600 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -170,8 +170,8 @@ const char *xa_state_names[]={ */ inline bool all_tables_not_ok(THD *thd, TABLE_LIST *tables) { - return rpl_filter->is_on() && tables && !thd->spcont && - !rpl_filter->tables_ok(thd->db, tables); + return thd->rpl_filter->is_on() && tables && !thd->spcont && + !thd->rpl_filter->tables_ok(thd->db, tables); } #endif @@ -1954,6 +1954,8 @@ mysql_execute_command(THD *thd) #ifdef HAVE_REPLICATION /* have table map for update for multi-update statement (BUG#37051) */ bool have_table_map_for_update= FALSE; + /* */ + Rpl_filter *rpl_filter= thd->rpl_filter; #endif DBUG_ENTER("mysql_execute_command"); #ifdef WITH_PARTITION_STORAGE_ENGINE @@ -2438,6 +2440,11 @@ case SQLCOM_PREPARE: else delete mi; } + else + { + mi->rpl_filter= get_or_create_rpl_filter(lex_mi->connection_name.str, + lex_mi->connection_name.length); + } mysql_mutex_unlock(&LOCK_active_mi); break; diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 762b35da89a..4393809a6fe 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -3371,19 +3371,48 @@ static Sys_var_mybool Sys_relay_log_recovery( bool Sys_var_rpl_filter::global_update(THD *thd, set_var *var) { bool result= true; // Assume error + Master_info *mi; mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_lock(&LOCK_active_mi); - if (!master_info_index->give_error_if_slave_running()) - result= set_filter_value(var->save_result.string_value.str); + + if (!var->base.length) // no base name + { + mi= master_info_index-> + get_master_info(&thd->variables.default_master_connection, + MYSQL_ERROR::WARN_LEVEL_ERROR); + } + else // has base name + { + mi= master_info_index-> + get_master_info(&var->base, + MYSQL_ERROR::WARN_LEVEL_WARN); + } + + if (mi) + { + if (mi->rli.slave_running) + { + my_error(ER_SLAVE_MUST_STOP, MYF(0), + mi->connection_name.length, + mi->connection_name.str); + result= true; + } + else + { + result= set_filter_value(var->save_result.string_value.str, mi); + } + } + mysql_mutex_unlock(&LOCK_active_mi); mysql_mutex_lock(&LOCK_global_system_variables); return result; } -bool Sys_var_rpl_filter::set_filter_value(const char *value) +bool Sys_var_rpl_filter::set_filter_value(const char *value, Master_info *mi) { bool status= true; + Rpl_filter* rpl_filter= mi ? mi->rpl_filter : global_rpl_filter; switch (opt_id) { case OPT_REPLICATE_DO_DB: @@ -3413,7 +3442,32 @@ uchar *Sys_var_rpl_filter::global_value_ptr(THD *thd, LEX_STRING *base) { char buf[256]; String tmp(buf, sizeof(buf), &my_charset_bin); + uchar *ret; + Master_info *mi; + Rpl_filter *rpl_filter; + mysql_mutex_unlock(&LOCK_global_system_variables); + mysql_mutex_lock(&LOCK_active_mi); + if (!base->length) // no base name + { + mi= master_info_index-> + get_master_info(&thd->variables.default_master_connection, + MYSQL_ERROR::WARN_LEVEL_ERROR); + } + else // has base name + { + mi= master_info_index-> + get_master_info(base, + MYSQL_ERROR::WARN_LEVEL_WARN); + } + mysql_mutex_lock(&LOCK_global_system_variables); + + if (!mi) + { + mysql_mutex_unlock(&LOCK_active_mi); + return 0; + } + rpl_filter= mi->rpl_filter; tmp.length(0); switch (opt_id) { @@ -3437,7 +3491,10 @@ uchar *Sys_var_rpl_filter::global_value_ptr(THD *thd, LEX_STRING *base) break; } - return (uchar *) thd->strmake(tmp.ptr(), tmp.length()); + ret= (uchar *) thd->strmake(tmp.ptr(), tmp.length()); + mysql_mutex_unlock(&LOCK_active_mi); + + return ret; } static Sys_var_rpl_filter Sys_replicate_do_db( diff --git a/sql/sys_vars.h b/sql/sys_vars.h index b04e3817406..f8b6537453c 100644 --- a/sql/sys_vars.h +++ b/sql/sys_vars.h @@ -28,6 +28,7 @@ #include "keycaches.h" #include "strfunc.h" #include "tztime.h" // my_tz_find, my_tz_SYSTEM, struct Time_zone +#include "rpl_mi.h" // For Multi-Source Replication /* a set of mostly trivial (as in f(X)=X) defines below to make system variable @@ -562,7 +563,7 @@ public: NO_ARG, SHOW_CHAR, 0, NULL, VARIABLE_NOT_IN_BINLOG, NULL, NULL, NULL), opt_id(getopt_id) { - option.var_type= GET_STR; + option.var_type= GET_STR | GET_ASK_ADDR; } bool do_check(THD *thd, set_var *var) @@ -588,7 +589,7 @@ public: protected: uchar *global_value_ptr(THD *thd, LEX_STRING *base); - bool set_filter_value(const char *value); + bool set_filter_value(const char *value, Master_info *mi); }; /** |