summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--repl-tests/test-repl-ts/repl-timestamp.master.reject2
-rw-r--r--sql/log_event.cc9
-rw-r--r--sql/log_event.h2
-rw-r--r--sql/mysql_priv.h1
-rw-r--r--sql/mysqlbinlog.cc2
-rw-r--r--sql/mysqld.cc31
-rw-r--r--sql/slave.cc54
-rw-r--r--sql/slave.h12
-rw-r--r--sql/sql_class.cc1
-rw-r--r--sql/sql_class.h3
-rw-r--r--sql/sql_parse.cc10
-rw-r--r--sql/sql_table.cc5
-rw-r--r--sql/sql_yacc.yy32
-rw-r--r--sql/table.h1
14 files changed, 133 insertions, 32 deletions
diff --git a/repl-tests/test-repl-ts/repl-timestamp.master.reject b/repl-tests/test-repl-ts/repl-timestamp.master.reject
index 37a76e7f360..091b18351ed 100644
--- a/repl-tests/test-repl-ts/repl-timestamp.master.reject
+++ b/repl-tests/test-repl-ts/repl-timestamp.master.reject
@@ -1,2 +1,2 @@
unix_timestamp(t)
-973302660
+973999074
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 62a3658bc9f..fc7ed2c5680 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -264,7 +264,7 @@ void Log_event::print_header(FILE* file)
{
fputc('#', file);
print_timestamp(file);
- fprintf(file, " server id %ld ", server_id);
+ fprintf(file, " server id %d ", server_id);
}
void Log_event::print_timestamp(FILE* file, time_t* ts)
@@ -421,12 +421,11 @@ Query_log_event::Query_log_event(FILE* file, time_t when_arg,
Query_log_event::Query_log_event(const char* buf, int event_len):
Log_event(buf),data_buf(0), query(NULL), db(NULL)
{
- if (event_len < QUERY_EVENT_OVERHEAD)
+ if ((uint)event_len < QUERY_EVENT_OVERHEAD)
return;
ulong data_len;
buf += EVENT_LEN_OFFSET;
data_len = event_len - QUERY_EVENT_OVERHEAD;
-
exec_time = uint4korr(buf + 8);
error_code = uint2korr(buf + 13);
@@ -613,7 +612,7 @@ Load_log_event::Load_log_event(const char* buf, int event_len):
{
ulong data_len;
- if(event_len < (LOAD_EVENT_OVERHEAD + LOG_EVENT_HEADER_LEN))
+ if((uint)event_len < (LOAD_EVENT_OVERHEAD + LOG_EVENT_HEADER_LEN))
return;
buf += EVENT_LEN_OFFSET;
@@ -709,7 +708,7 @@ void Load_log_event::print(FILE* file, bool short_form)
}
if((int)skip_lines > 0)
- fprintf(file, " IGNORE %ld LINES ", skip_lines);
+ fprintf(file, " IGNORE %d LINES ", skip_lines);
if(num_fields)
{
diff --git a/sql/log_event.h b/sql/log_event.h
index 33fcab4fea9..0e4121eb354 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -39,7 +39,7 @@
#define EVENT_LEN_OFFSET 9
#define EVENT_TYPE_OFFSET 4
#define MAX_EVENT_LEN 4*1024*1024
-#define QUERY_EVENT_OVERHEAD LOG_EVENT_HEADER_LEN+QUERY_HEADER_LEN
+#define QUERY_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+QUERY_HEADER_LEN)
#define ROTATE_EVENT_OVERHEAD LOG_EVENT_HEADER_LEN
#define LOAD_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+LOAD_HEADER_LEN+sizeof(sql_ex_info))
diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h
index bc76b128d54..7081c52ad1d 100644
--- a/sql/mysql_priv.h
+++ b/sql/mysql_priv.h
@@ -362,6 +362,7 @@ bool add_field_to_list(char *field_name, enum enum_field_types type,
void store_position_for_column(const char *name);
bool add_to_list(SQL_LIST &list,Item *group,bool asc=0);
TABLE_LIST *add_table_to_list(Table_ident *table,LEX_STRING *alias,
+ bool updating,
thr_lock_type flags=TL_UNLOCK,
List<String> *use_index=0,
List<String> *ignore_index=0);
diff --git a/sql/mysqlbinlog.cc b/sql/mysqlbinlog.cc
index 3756d8bf712..ee736d72792 100644
--- a/sql/mysqlbinlog.cc
+++ b/sql/mysqlbinlog.cc
@@ -295,7 +295,7 @@ Unfortunately, no sweepstakes today, adjusted position to 4\n");
len, net->read_pos[5]));
Log_event * ev = Log_event::read_log_event(
(const char*) net->read_pos + 1 ,
- len);
+ len - 1);
if(ev)
{
ev->print(stdout, short_form);
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index e9389727e9a..88a81f0fdeb 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -2230,7 +2230,8 @@ enum options {
OPT_SKIP_CONCURRENT_INSERT, OPT_MEMLOCK, OPT_MYISAM_RECOVER,
OPT_REPLICATE_REWRITE_DB, OPT_SERVER_ID, OPT_SKIP_SLAVE_START,
OPT_SKIP_INNOBASE,OPT_SAFEMALLOC_MEM_LIMIT,
- OPT_REPLICATE_DO_TABLE, OPT_REPLICATE_IGNORE_TABLE
+ OPT_REPLICATE_DO_TABLE, OPT_REPLICATE_IGNORE_TABLE,
+ OPT_REPL_WILD_DO_TABLE, OPT_REPL_WILD_IGNORE_TABLE
};
static struct option long_options[] = {
@@ -2300,10 +2301,14 @@ static struct option long_options[] = {
{"replicate-do-db", required_argument, 0, (int) OPT_REPLICATE_DO_DB},
{"replicate-do-table", required_argument, 0,
(int) OPT_REPLICATE_DO_TABLE},
+ {"replicate-wild-do-table", required_argument, 0,
+ (int) OPT_REPL_WILD_DO_TABLE},
{"replicate-ignore-db", required_argument, 0,
(int) OPT_REPLICATE_IGNORE_DB},
{"replicate-ignore-table", required_argument, 0,
(int) OPT_REPLICATE_IGNORE_TABLE},
+ {"replicate-wild-ignore-table", required_argument, 0,
+ (int) OPT_REPL_WILD_IGNORE_TABLE},
{"replicate-rewrite-db", required_argument, 0,
(int) OPT_REPLICATE_REWRITE_DB},
{"safe-mode", no_argument, 0, (int) OPT_SAFE},
@@ -2938,6 +2943,30 @@ static void get_options(int argc,char **argv)
binlog_do_db.push_back(db);
break;
}
+ case (int)OPT_REPLICATE_DO_TABLE:
+ {
+ if(!do_table_inited)
+ init_table_rule_hash(&replicate_do_table, &do_table_inited);
+ if(add_table_rule(&replicate_do_table, optarg))
+ {
+ fprintf(stderr, "could not add do table rule '%s'\n", optarg);
+ exit(1);
+ }
+ table_rules_on = 1;
+ break;
+ }
+ case (int)OPT_REPLICATE_IGNORE_TABLE:
+ {
+ if(!ignore_table_inited)
+ init_table_rule_hash(&replicate_ignore_table, &ignore_table_inited);
+ if(add_table_rule(&replicate_ignore_table, optarg))
+ {
+ fprintf(stderr, "could not add ignore table rule '%s'\n", optarg);
+ exit(1);
+ }
+ table_rules_on = 1;
+ break;
+ }
case (int) OPT_SQL_BIN_UPDATE_SAME:
opt_sql_bin_update = 1;
break;
diff --git a/sql/slave.cc b/sql/slave.cc
index a3f6f518ddc..d0128996832 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -27,6 +27,7 @@ pthread_t slave_real_id;
MASTER_INFO glob_mi;
HASH replicate_do_table, replicate_ignore_table;
bool do_table_inited = 0, ignore_table_inited = 0;
+bool table_rules_on = 0;
static inline void skip_load_data_infile(NET* net);
@@ -60,6 +61,52 @@ void init_table_rule_hash(HASH* h, bool* h_inited)
*h_inited = 1;
}
+int tables_ok(THD* thd, TABLE_LIST* tables)
+{
+ for(; tables; tables = tables->next)
+ {
+ if(!tables->updating)
+ continue;
+ char hash_key[2*NAME_LEN+2];
+ char* p;
+ p = strmov(hash_key, tables->db ? tables->db : thd->db);
+ *p++ = '.';
+ uint len = strmov(p, tables->real_name) - hash_key ;
+ if(do_table_inited) // if there are any do's
+ {
+ if(hash_search(&replicate_do_table, hash_key, len))
+ return 1;
+ }
+ if(ignore_table_inited) // if there are any do's
+ {
+ if(hash_search(&replicate_ignore_table, hash_key, len))
+ return 0;
+ }
+ }
+
+ return !do_table_inited; // if no explicit rule found
+ // and there was a do list, do not replicate. If there was
+ // no do list, go ahead
+}
+
+
+int add_table_rule(HASH* h, const char* table_spec)
+{
+ char* dot = strchr(table_spec, '.');
+ if(!dot) return 1;
+ uint len = (uint)strlen(table_spec);
+ if(!len) return 1;
+ TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
+ + len, MYF(MY_WME));
+ if(!e) return 1;
+ e->db = (char*)e + sizeof(TABLE_RULE_ENT);
+ e->tbl_name = e->db + (dot - table_spec) + 1;
+ e->key_len = len;
+ memcpy(e->db, table_spec, len);
+ (void)hash_insert(h, (byte*)e);
+ return 0;
+}
+
static inline bool slave_killed(THD* thd)
{
return abort_slave || abort_loop || thd->killed;
@@ -438,6 +485,7 @@ static int init_slave_thread(THD* thd)
thd->max_packet_length=thd->net.max_packet;
thd->master_access= ~0;
thd->priv_user = 0;
+ thd->slave_thread = 1;
thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0)
| OPTION_AUTO_COMMIT | OPTION_AUTO_IS_NULL) ;
thd->system_thread = 1;
@@ -655,14 +703,16 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
// unless set explictly
close_thread_tables(thd);
free_root(&thd->mem_root,0);
- delete ev;
if (thd->query_error)
{
sql_print_error("Slave: error running query '%s' ",
qev->query);
+ delete ev;
return 1;
}
+
+ delete ev;
if(thd->fatal_error)
{
@@ -897,6 +947,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
goto err;
thd->proc_info = "waiting to reconnect after a failed dump request";
+ vio_close(mysql->net.vio);
safe_sleep(thd, glob_mi.connect_retry);
if(slave_killed(thd))
goto err;
@@ -922,6 +973,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
if (event_len == packet_error)
{
thd->proc_info = "waiting to reconnect after a failed read";
+ vio_close(mysql->net.vio);
safe_sleep(thd, glob_mi.connect_retry);
if(slave_killed(thd))
goto err;
diff --git a/sql/slave.h b/sql/slave.h
index 67036b4120c..5df90cbf55b 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -67,11 +67,16 @@ int fetch_nx_table(THD* thd, MASTER_INFO* mi);
int show_master_info(THD* thd);
int show_binlog_info(THD* thd);
+int tables_ok(THD* thd, TABLE_LIST* tables);
+// see if the query uses any tables that should not be replicated
+
int db_ok(const char* db, I_List<i_string> &do_list,
I_List<i_string> &ignore_list );
// check to see if the database is ok to operate on with respect to the
// do and ignore lists - used in replication
+int add_table_rule(HASH* h, const char* table_spec);
+void init_table_rule_hash(HASH* h, bool* h_inited);
int init_master_info(MASTER_INFO* mi);
extern bool opt_log_slave_updates ;
@@ -81,7 +86,10 @@ extern bool slave_running;
extern pthread_t slave_real_id;
extern MASTER_INFO glob_mi;
extern HASH replicate_do_table, replicate_ignore_table;
-extern bool do_table_inited, ignore_table_inited;
+extern DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
+extern bool do_table_inited, ignore_table_inited,
+ wild_do_table_inited, wild_ignore_table_inited;
+extern bool table_rules_on;
// the master variables are defaults read from my.cnf or command line
extern uint master_port, master_connect_retry;
@@ -93,3 +101,5 @@ extern I_List<i_string_pair> replicate_rewrite_db;
extern I_List<THD> threads;
#endif
+
+
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index f86c0cf7461..5ffe96458f2 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -95,6 +95,7 @@ THD::THD()
update_lock_default= low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE;
start_time=(time_t) 0;
current_linfo = 0;
+ slave_thread = 0;
last_nx_table = last_nx_db = 0;
inactive_timeout=net_wait_timeout;
open_options=ha_open_options;
diff --git a/sql/sql_class.h b/sql/sql_class.h
index e53ce950c1f..aef4c2c5c42 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -262,6 +262,7 @@ public:
pthread_t real_id;
uint current_tablenr,tmp_table,cond_count,col_access,query_length;
uint server_status,open_options;
+ bool slave_thread;
char scramble[9];
bool set_query_id,locked,count_cuted_fields,some_tables_deleted;
bool no_errors, allow_sum_func, password, fatal_error;
@@ -278,7 +279,7 @@ public:
bool store_globals();
inline time_t query_start() { query_start_used=1; return start_time; }
inline void set_time() { if (!user_time) time_after_lock=time(&start_time); }
- inline void end_time() { time(&start_time); }
+ inline void end_time() { if(!user_time) time(&start_time); }
inline void set_time(time_t t) { time_after_lock=start_time=t; user_time=1; }
inline void lock_time() { time(&time_after_lock); }
inline void insert_id(ulonglong id)
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 066885ef30a..acf055b5c74 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -843,6 +843,11 @@ mysql_execute_command(void)
TABLE_LIST *tables=(TABLE_LIST*) lex->table_list.first;
DBUG_ENTER("mysql_execute_command");
+ if(thd->slave_thread && table_rules_on && tables && !tables_ok(thd,tables))
+ DBUG_VOID_RETURN; // skip if we are in the slave thread, some table
+ // rules have been given and the table list says the query should not be
+ // replicated
+
switch (lex->sql_command) {
case SQLCOM_SELECT:
{
@@ -2344,9 +2349,11 @@ bool add_to_list(SQL_LIST &list,Item *item,bool asc)
TABLE_LIST *add_table_to_list(Table_ident *table, LEX_STRING *alias,
+ bool updating,
thr_lock_type flags,
List<String> *use_index,
- List<String> *ignore_index)
+ List<String> *ignore_index
+ )
{
register TABLE_LIST *ptr;
THD *thd=current_thd;
@@ -2378,6 +2385,7 @@ TABLE_LIST *add_table_to_list(Table_ident *table, LEX_STRING *alias,
ptr->real_name=table->table.str;
ptr->name=alias_str;
ptr->lock_type=flags;
+ ptr->updating=updating;
if (use_index)
ptr->use_index=(List<String> *) thd->memdup((gptr) use_index,
sizeof(*use_index));
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index bcc0e4257f2..3c7e5c3d180 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -67,8 +67,7 @@ int mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists)
error = 1;
goto err;
}
- while (global_read_lock && ! thd->killed ||
- thd->version != refresh_version)
+ while (global_read_lock && ! thd->killed)
{
(void) pthread_cond_wait(&COND_refresh,&LOCK_open);
}
@@ -156,7 +155,7 @@ int mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists)
if (wrong_tables.length())
{
my_error(ER_BAD_TABLE_ERROR,MYF(0),wrong_tables.c_ptr());
- DBUG_RETURN(-1);
+ error=1;
}
if(error)
DBUG_RETURN(-1);
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index 5d62fbe84ba..26cb50009f7 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -628,7 +628,7 @@ create:
lex->sql_command= SQLCOM_CREATE_TABLE;
if (!add_table_to_list($5,
($2 & HA_LEX_CREATE_TMP_TABLE ?
- &tmp_table_alias : (LEX_STRING*) 0)))
+ &tmp_table_alias : (LEX_STRING*) 0),1))
YYABORT;
lex->create_list.empty();
lex->key_list.empty();
@@ -643,7 +643,7 @@ create:
| CREATE opt_unique_or_fulltext INDEX ident ON table_ident
{
Lex->sql_command= SQLCOM_CREATE_INDEX;
- if (!add_table_to_list($6,NULL))
+ if (!add_table_to_list($6,NULL,1))
YYABORT;
Lex->create_list.empty();
Lex->key_list.empty();
@@ -1022,7 +1022,7 @@ alter:
LEX *lex=Lex;
lex->sql_command = SQLCOM_ALTER_TABLE;
lex->name=0;
- if (!add_table_to_list($4, NULL))
+ if (!add_table_to_list($4, NULL,1))
YYABORT;
lex->drop_primary=0;
lex->create_list.empty();
@@ -1198,8 +1198,8 @@ table_to_table_list:
table_to_table:
table_ident TO_SYM table_ident
- { if (!add_table_to_list($1,NULL,TL_IGNORE) ||
- !add_table_to_list($3,NULL,TL_IGNORE))
+ { if (!add_table_to_list($1,NULL,1,TL_IGNORE) ||
+ !add_table_to_list($3,NULL,1,TL_IGNORE))
YYABORT;
}
@@ -1749,7 +1749,7 @@ normal_join:
join_table:
{ Lex->use_index_ptr=Lex->ignore_index_ptr=0; }
table_ident opt_table_alias opt_key_definition
- { if (!($$=add_table_to_list($2,$3,TL_UNLOCK, Lex->use_index_ptr,
+ { if (!($$=add_table_to_list($2,$3,0,TL_UNLOCK, Lex->use_index_ptr,
Lex->ignore_index_ptr))) YYABORT; }
| '{' ident join_table LEFT OUTER JOIN_SYM join_table ON expr '}'
{ add_join_on($7,$9); $7->outer_join|=JOIN_TYPE_LEFT; $$=$7; }
@@ -1953,7 +1953,7 @@ drop:
Lex->drop_list.empty();
Lex->drop_list.push_back(new Alter_drop(Alter_drop::KEY,
$3.str));
- if (!add_table_to_list($5,NULL))
+ if (!add_table_to_list($5,NULL, 1))
YYABORT;
}
| DROP DATABASE if_exists ident
@@ -1975,7 +1975,7 @@ table_list:
table:
table_ident
- { if (!add_table_to_list($1,NULL)) YYABORT; }
+ { if (!add_table_to_list($1,NULL,1)) YYABORT; }
if_exists:
/* empty */ { $$=0; }
@@ -2150,7 +2150,7 @@ show_param:
Lex->sql_command= SQLCOM_SHOW_FIELDS;
if ($4)
$3->change_db($4);
- if (!add_table_to_list($3,NULL))
+ if (!add_table_to_list($3,NULL,0))
YYABORT;
}
| MASTER_SYM LOGS_SYM
@@ -2162,7 +2162,7 @@ show_param:
Lex->sql_command= SQLCOM_SHOW_KEYS;
if ($4)
$3->change_db($4);
- if (!add_table_to_list($3,NULL))
+ if (!add_table_to_list($3,NULL,0))
YYABORT;
}
| STATUS_SYM wild
@@ -2179,7 +2179,7 @@ show_param:
| CREATE TABLE_SYM table_ident
{
Lex->sql_command = SQLCOM_SHOW_CREATE;
- if(!add_table_to_list($3, NULL))
+ if(!add_table_to_list($3, NULL,0))
YYABORT;
}
| MASTER_SYM STATUS_SYM
@@ -2205,7 +2205,7 @@ describe:
{
Lex->wild=0;
Lex->sql_command=SQLCOM_SHOW_FIELDS;
- if (!add_table_to_list($2, NULL))
+ if (!add_table_to_list($2, NULL,0))
YYABORT;
}
opt_describe_column
@@ -2290,14 +2290,14 @@ load: LOAD DATA_SYM opt_low_priority opt_local INFILE TEXT_STRING
opt_duplicate INTO TABLE_SYM table_ident opt_field_term opt_line_term
opt_ignore_lines opt_field_spec
{
- if (!add_table_to_list($11,NULL))
+ if (!add_table_to_list($11,NULL,1))
YYABORT;
}
|
LOAD TABLE_SYM table_ident FROM MASTER_SYM
{
Lex->sql_command = SQLCOM_LOAD_MASTER_TABLE;
- if (!add_table_to_list($3,NULL))
+ if (!add_table_to_list($3,NULL,1))
YYABORT;
}
@@ -2686,7 +2686,7 @@ table_lock_list:
table_lock:
table_ident opt_table_alias lock_option
- { if (!add_table_to_list($1,$2,(thr_lock_type) $3)) YYABORT; }
+ { if (!add_table_to_list($1,$2,0,(thr_lock_type) $3)) YYABORT; }
lock_option:
READ_SYM { $$=TL_READ_NO_INSERT; }
@@ -2791,7 +2791,7 @@ opt_table:
}
| table_ident
{
- if (!add_table_to_list($1,NULL))
+ if (!add_table_to_list($1,NULL,0))
YYABORT;
if (Lex->grant == UINT_MAX)
Lex->grant = TABLE_ACLS & ~GRANT_ACL;
diff --git a/sql/table.h b/sql/table.h
index 5bbb0210f38..8121271b479 100644
--- a/sql/table.h
+++ b/sql/table.h
@@ -138,4 +138,5 @@ typedef struct st_table_list {
thr_lock_type lock_type;
uint outer_join; /* Which join type */
bool straight; /* optimize with prev table */
+ bool updating; /* for replicate-do/ignore table */
} TABLE_LIST;