diff options
author | Sergey Petrunya <psergey@askmonty.org> | 2009-11-05 01:41:58 +0300 |
---|---|---|
committer | Sergey Petrunya <psergey@askmonty.org> | 2009-11-05 01:41:58 +0300 |
commit | 99d8d4402080270289f00465309c7c40c2e5d566 (patch) | |
tree | b6f999441b665f63a03e94ed4e6b3456fe28c3d4 | |
parent | 8f5b0279e28622c5af703c108bebd4a3a167595f (diff) | |
parent | 48c7a65f3bb2ebe6b33904ae670a20202c5ec8a3 (diff) | |
download | mariadb-git-99d8d4402080270289f00465309c7c40c2e5d566.tar.gz |
MWL#36: Add a mysqlbinlog option to change the used database
- Merge to MariaDB 5.2
-rw-r--r-- | client/Makefile.am | 3 | ||||
-rw-r--r-- | client/client_priv.h | 1 | ||||
-rw-r--r-- | client/mysqlbinlog.cc | 142 | ||||
-rw-r--r-- | client/sql_string.cc | 9 | ||||
-rw-r--r-- | client/sql_string.h | 5 | ||||
-rw-r--r-- | mysql-test/std_data/loaddata7.dat | 5 | ||||
-rw-r--r-- | mysql-test/suite/binlog/r/binlog_row_mysqlbinlog_options.result | 416 | ||||
-rw-r--r-- | mysql-test/suite/binlog/t/binlog_row_mysqlbinlog_options.test | 78 | ||||
-rw-r--r-- | sql/log_event.cc | 107 | ||||
-rw-r--r-- | sql/log_event.h | 18 | ||||
-rw-r--r-- | sql/mysql_priv.h | 4 | ||||
-rw-r--r-- | sql/mysqld.cc | 1 | ||||
-rw-r--r-- | sql/rpl_filter.cc | 9 | ||||
-rw-r--r-- | sql/rpl_filter.h | 5 | ||||
-rw-r--r-- | sql/sql_string.cc | 4 | ||||
-rw-r--r-- | sql/sql_string.h | 8 | ||||
-rw-r--r-- | sql/thr_malloc.cc | 2 |
17 files changed, 796 insertions, 21 deletions
diff --git a/client/Makefile.am b/client/Makefile.am index ccd0d8aada0..ee56662da01 100644 --- a/client/Makefile.am +++ b/client/Makefile.am @@ -107,7 +107,8 @@ sql_src=log_event.h mysql_priv.h rpl_constants.h \ rpl_utility.h rpl_tblmap.h rpl_tblmap.cc \ log_event.cc my_decimal.h my_decimal.cc \ log_event_old.h log_event_old.cc \ - rpl_record_old.h rpl_record_old.cc + rpl_record_old.h rpl_record_old.cc \ + sql_list.h rpl_filter.h sql_list.cc rpl_filter.cc strings_src=decimal.c link_sources: diff --git a/client/client_priv.h b/client/client_priv.h index 07b5cd72d84..cc618d51d8c 100644 --- a/client/client_priv.h +++ b/client/client_priv.h @@ -81,5 +81,6 @@ enum options_client OPT_DEBUG_INFO, OPT_DEBUG_CHECK, OPT_COLUMN_TYPES, OPT_ERROR_LOG_FILE, OPT_WRITE_BINLOG, OPT_DUMP_DATE, OPT_ABORT_SOURCE_ON_ERROR, + OPT_REWRITE_DB, OPT_MAX_CLIENT_OPTION }; diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc index 82af7ca65f6..1d519294042 100644 --- a/client/mysqlbinlog.cc +++ b/client/mysqlbinlog.cc @@ -35,6 +35,15 @@ #include "log_event.h" #include "sql_common.h" +/* Needed for Rpl_filter */ +CHARSET_INFO* system_charset_info= &my_charset_utf8_general_ci; + +#include "sql_string.h" // needed for Rpl_filter +#include "sql_list.h" // needed for Rpl_filter +#include "rpl_filter.h" + +Rpl_filter *binlog_filter; + #define BIN_LOG_HEADER_SIZE 4 #define PROBE_HEADER_LEN (EVENT_LEN_OFFSET+4) @@ -616,6 +625,42 @@ static bool shall_skip_database(const char *log_dbname) /** + Print "use <db>" statement when current db is to be changed. + + We have to control emiting USE statements according to rewrite-db options. + We have to do it here (see process_event() below) and to suppress + producing USE statements by corresponding log event print-functions. +*/ + +void print_use_stmt(PRINT_EVENT_INFO* pinfo, const char* db, size_t db_len) +{ + // pinfo->db is the current db. + // If current db is the same as required db, do nothing. + if (!db || !memcmp(pinfo->db, db, db_len + 1)) + return; + + // Current db and required db are different. + // Check for rewrite rule for required db. (Note that in a rewrite rule + // neither db_from nor db_to part can be empty). + size_t len_to= 0; + const char *db_to= binlog_filter->get_rewrite_db(db, &len_to); + + // If there is no rewrite rule for db (in this case len_to is left = 0), + // printing of the corresponding USE statement is left for log event + // print-function. + if (!len_to) + return; + + // In case of rewrite rule print USE statement for db_to + fprintf(result_file, "use %s%s\n", db_to, pinfo->delimiter); + + // Copy the *original* db to pinfo to suppress emiting + // of USE stmts by log_event print-functions. + memcpy(pinfo->db, db, db_len + 1); +} + + +/** Prints the given event in base64 format. The header is printed to the head cache and the body is printed to @@ -726,8 +771,11 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, switch (ev_type) { case QUERY_EVENT: - if (shall_skip_database(((Query_log_event*)ev)->db)) + { + Query_log_event *qe= (Query_log_event*)ev; + if (shall_skip_database(qe->db)) goto end; + print_use_stmt(print_event_info, qe->db, qe->db_len); if (opt_base64_output_mode == BASE64_OUTPUT_ALWAYS) { if ((retval= write_event_header_and_base64(ev, result_file, @@ -738,6 +786,7 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, else ev->print(result_file, print_event_info); break; + } case CREATE_FILE_EVENT: { @@ -855,6 +904,7 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, if (!shall_skip_database(exlq->db)) { + print_use_stmt(print_event_info, exlq->db, exlq->db_len); if (fname) { convert_path_to_forward_slashes(fname); @@ -878,6 +928,13 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, destroy_evt= FALSE; goto end; } + size_t len_to= 0; + const char* db_to= binlog_filter->get_rewrite_db(map->get_db_name(), &len_to); + if (len_to && map->rewrite_db(db_to, len_to, glob_description_event)) + { + error("Could not rewrite database name"); + goto err; + } } case WRITE_ROWS_EVENT: case DELETE_ROWS_EVENT: @@ -962,14 +1019,16 @@ err: retval= ERROR_STOP; end: rec_count++; + /* - Destroy the log_event object. If reading from a remote host, - set the temp_buf to NULL so that memory isn't freed twice. + Destroy the log_event object. + MariaDB MWL#36: mainline does this: + If reading from a remote host, + set the temp_buf to NULL so that memory isn't freed twice. + We no longer do that, we use Rpl_filter::event_owns_temp_buf instead. */ if (ev) { - if (remote_opt) - ev->temp_buf= 0; if (destroy_evt) /* destroy it later if not set (ignored table map) */ delete ev; } @@ -1130,6 +1189,10 @@ that may lead to an endless loop.", "Used to reserve file descriptors for usage by this program", (uchar**) &open_files_limit, (uchar**) &open_files_limit, 0, GET_ULONG, REQUIRED_ARG, MY_NFILE, 8, OS_FILE_LIMIT, 0, 1, 0}, + {"rewrite-db", OPT_REWRITE_DB, + "Updates to a database with a different name than the original. \ +Example: rewrite-db='from->to'.", + 0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} }; @@ -1321,6 +1384,53 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), (find_type_or_exit(argument, &base64_output_mode_typelib, opt->name)-1); } break; + case OPT_REWRITE_DB: // db_from->db_to + { + /* See also handling of OPT_REPLICATE_REWRITE_DB in sql/mysqld.cc */ + char* ptr; + char* key= argument; // db-from + char* val; // db-to + + // Where key begins + while (*key && my_isspace(&my_charset_latin1, *key)) + key++; + + // Where val begins + if (!(ptr= strstr(argument, "->"))) + { + sql_print_error("Bad syntax in rewrite-db: missing '->'!\n"); + return 1; + } + val= ptr + 2; + while (*val && my_isspace(&my_charset_latin1, *val)) + val++; + + // Write \0 and skip blanks at the end of key + *ptr-- = 0; + while (my_isspace(&my_charset_latin1, *ptr) && ptr > argument) + *ptr-- = 0; + + if (!*key) + { + sql_print_error("Bad syntax in rewrite-db: empty db-from!\n"); + return 1; + } + + // Skip blanks at the end of val + ptr= val; + while (*ptr && !my_isspace(&my_charset_latin1, *ptr)) + ptr++; + *ptr= 0; + + if (!*val) + { + sql_print_error("Bad syntax in rewrite-db: empty db-to!\n"); + return 1; + } + + binlog_filter->add_db_rewrite(key, val); + break; + } case 'v': if (argument == disabled_my_option) verbose= 0; @@ -1586,7 +1696,7 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info, If reading from a remote host, ensure the temp_buf for the Log_event class is pointing to the incoming stream. */ - ev->register_temp_buf((char *) net->read_pos + 1); + ev->register_temp_buf((char *) net->read_pos + 1, FALSE); Log_event_type type= ev->get_type_code(); if (glob_description_event->binlog_version >= 3 || @@ -1986,6 +2096,8 @@ end: return retval; } +/* Used in sql_alloc(). Inited and freed in main() */ +MEM_ROOT s_mem_root; int main(int argc, char** argv) { @@ -1998,6 +2110,13 @@ int main(int argc, char** argv) my_init_time(); // for time functions + init_alloc_root(&s_mem_root, 16384, 0); + if (!(binlog_filter= new Rpl_filter)) + { + error("Failed to create Rpl_filter"); + exit(1); + } + parse_args(&argc, (char***)&argv); defaults_argv=argv; @@ -2084,6 +2203,8 @@ int main(int argc, char** argv) if (result_file != stdout) my_fclose(result_file, MYF(0)); cleanup(); + delete binlog_filter; + free_root(&s_mem_root, MYF(0)); free_defaults(defaults_argv); my_free_open_file_info(); load_processor.destroy(); @@ -2095,6 +2216,12 @@ int main(int argc, char** argv) DBUG_RETURN(retval == ERROR_STOP ? 1 : 0); } + +void *sql_alloc(size_t size) +{ + return alloc_root(&s_mem_root, size); +} + /* We must include this here as it's compiled with different options for the server @@ -2105,4 +2232,7 @@ int main(int argc, char** argv) #include "my_decimal.cc" #include "log_event.cc" #include "log_event_old.cc" +#include "sql_string.cc" +#include "sql_list.cc" +#include "rpl_filter.cc" diff --git a/client/sql_string.cc b/client/sql_string.cc index dc6147b563f..9fa8fae2860 100644 --- a/client/sql_string.cc +++ b/client/sql_string.cc @@ -26,15 +26,6 @@ #ifdef HAVE_FCONVERT #include <floatingpoint.h> #endif - -/* - The following extern declarations are ok as these are interface functions - required by the string function -*/ - -extern void sql_alloc(size_t size); -extern void sql_element_free(void *ptr); - #include "sql_string.h" /***************************************************************************** diff --git a/client/sql_string.h b/client/sql_string.h index da19c1ccfe5..0a23f9ee92f 100644 --- a/client/sql_string.h +++ b/client/sql_string.h @@ -15,6 +15,9 @@ /* This file is originally from the mysql distribution. Coded by monty */ +#ifndef CLIENT_SQL_STRING_H +#define CLIENT_SQL_STRING_H + #ifdef USE_PRAGMA_INTERFACE #pragma interface /* gcc class implementation */ #endif @@ -353,3 +356,5 @@ public: return (s->alloced && Ptr >= s->Ptr && Ptr < s->Ptr + s->str_length); } }; + +#endif diff --git a/mysql-test/std_data/loaddata7.dat b/mysql-test/std_data/loaddata7.dat new file mode 100644 index 00000000000..e86d1621bd4 --- /dev/null +++ b/mysql-test/std_data/loaddata7.dat @@ -0,0 +1,5 @@ +2,2
+3,3
+4,4
+5,5
+6,6
\ No newline at end of file diff --git a/mysql-test/suite/binlog/r/binlog_row_mysqlbinlog_options.result b/mysql-test/suite/binlog/r/binlog_row_mysqlbinlog_options.result new file mode 100644 index 00000000000..5a6fbc953b4 --- /dev/null +++ b/mysql-test/suite/binlog/r/binlog_row_mysqlbinlog_options.result @@ -0,0 +1,416 @@ +DROP DATABASE IF EXISTS test1; +DROP DATABASE IF EXISTS test2; +DROP DATABASE IF EXISTS test3; +CREATE DATABASE test1; +CREATE DATABASE test2; +CREATE DATABASE test3; +SET timestamp=1000000000; +RESET MASTER; +USE test1; +CREATE TABLE t1 (a INT, b INT); +INSERT INTO t1 VALUES (1,1),(2,2); +USE test2; +CREATE TABLE t2 (a INT); +INSERT INTO t2 VALUES (1),(2); +DELETE FROM test1.t1 WHERE a=1; +USE test3; +CREATE TABLE t3 (a INT); +INSERT INTO t3 VALUES (1),(2); +INSERT INTO test1.t1 VALUES (3,3); +USE test1; +LOAD DATA INFILE '../../std_data/loaddata7.dat' INTO TABLE t1 +FIELDS TERMINATED BY ',' LINES TERMINATED BY '\r\n'; +DELETE FROM test3.t3 WHERE a=1; +flush logs; +# +# mysqlbinlog output +# --base64-output = decode-rows +# --rewrite-db = test1->new_test1 +# --rewrite-db = test3->new_test3 +# +/*!40019 SET @@session.max_insert_delayed_threads=0*/; +/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/; +DELIMITER /*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Start: binlog v 4, server v #.##.## created 010909 4:46:40 at startup +ROLLBACK/*!*/; +# at # +use new_test1/*!*/; +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +SET @@session.pseudo_thread_id=#/*!*/; +SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=1, @@session.unique_checks=1, @@session.autocommit=1/*!*/; +SET @@session.sql_mode=0/*!*/; +SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/; +/*!\C latin1 *//*!*/; +SET @@session.character_set_client=8,@@session.collation_connection=8,@@session.collation_server=8/*!*/; +SET @@session.lc_time_names=0/*!*/; +SET @@session.collation_database=DEFAULT/*!*/; +CREATE TABLE t1 (a INT, b INT) +/*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +BEGIN +/*!*/; +# at # +# at # +#010909 4:46:40 server id # end_log_pos # Table_map: `new_test1`.`t1` mapped to number # +#010909 4:46:40 server id # end_log_pos # Write_rows: table id # flags: STMT_END_F +### INSERT INTO new_test1.t1 +### SET +### @1=1 /* INT meta=0 nullable=1 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### INSERT INTO new_test1.t1 +### SET +### @1=2 /* INT meta=0 nullable=1 is_null=0 */ +### @2=2 /* INT meta=0 nullable=1 is_null=0 */ +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +COMMIT +/*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +use test2/*!*/; +SET TIMESTAMP=1000000000/*!*/; +CREATE TABLE t2 (a INT) +/*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +BEGIN +/*!*/; +# at # +# at # +#010909 4:46:40 server id # end_log_pos # Table_map: `test2`.`t2` mapped to number # +#010909 4:46:40 server id # end_log_pos # Write_rows: table id # flags: STMT_END_F +### INSERT INTO test2.t2 +### SET +### @1=1 /* INT meta=0 nullable=1 is_null=0 */ +### INSERT INTO test2.t2 +### SET +### @1=2 /* INT meta=0 nullable=1 is_null=0 */ +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +COMMIT +/*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +BEGIN +/*!*/; +# at # +# at # +#010909 4:46:40 server id # end_log_pos # Table_map: `new_test1`.`t1` mapped to number # +#010909 4:46:40 server id # end_log_pos # Delete_rows: table id # flags: STMT_END_F +### DELETE FROM new_test1.t1 +### WHERE +### @1=1 /* INT meta=0 nullable=1 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +COMMIT +/*!*/; +# at # +use new_test3/*!*/; +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +CREATE TABLE t3 (a INT) +/*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +BEGIN +/*!*/; +# at # +# at # +#010909 4:46:40 server id # end_log_pos # Table_map: `new_test3`.`t3` mapped to number # +#010909 4:46:40 server id # end_log_pos # Write_rows: table id # flags: STMT_END_F +### INSERT INTO new_test3.t3 +### SET +### @1=1 /* INT meta=0 nullable=1 is_null=0 */ +### INSERT INTO new_test3.t3 +### SET +### @1=2 /* INT meta=0 nullable=1 is_null=0 */ +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +COMMIT +/*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +BEGIN +/*!*/; +# at # +# at # +#010909 4:46:40 server id # end_log_pos # Table_map: `new_test1`.`t1` mapped to number # +#010909 4:46:40 server id # end_log_pos # Write_rows: table id # flags: STMT_END_F +### INSERT INTO new_test1.t1 +### SET +### @1=3 /* INT meta=0 nullable=1 is_null=0 */ +### @2=3 /* INT meta=0 nullable=1 is_null=0 */ +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +COMMIT +/*!*/; +# at # +use new_test1/*!*/; +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +BEGIN +/*!*/; +# at # +# at # +#010909 4:46:40 server id # end_log_pos # Table_map: `new_test1`.`t1` mapped to number # +#010909 4:46:40 server id # end_log_pos # Write_rows: table id # flags: STMT_END_F +### INSERT INTO new_test1.t1 +### SET +### @1=2 /* INT meta=0 nullable=1 is_null=0 */ +### @2=2 /* INT meta=0 nullable=1 is_null=0 */ +### INSERT INTO new_test1.t1 +### SET +### @1=3 /* INT meta=0 nullable=1 is_null=0 */ +### @2=3 /* INT meta=0 nullable=1 is_null=0 */ +### INSERT INTO new_test1.t1 +### SET +### @1=4 /* INT meta=0 nullable=1 is_null=0 */ +### @2=4 /* INT meta=0 nullable=1 is_null=0 */ +### INSERT INTO new_test1.t1 +### SET +### @1=5 /* INT meta=0 nullable=1 is_null=0 */ +### @2=5 /* INT meta=0 nullable=1 is_null=0 */ +### INSERT INTO new_test1.t1 +### SET +### @1=6 /* INT meta=0 nullable=1 is_null=0 */ +### @2=6 /* INT meta=0 nullable=1 is_null=0 */ +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +COMMIT +/*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +BEGIN +/*!*/; +# at # +# at # +#010909 4:46:40 server id # end_log_pos # Table_map: `new_test3`.`t3` mapped to number # +#010909 4:46:40 server id # end_log_pos # Delete_rows: table id # flags: STMT_END_F +### DELETE FROM new_test3.t3 +### WHERE +### @1=1 /* INT meta=0 nullable=1 is_null=0 */ +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +COMMIT +/*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Rotate to master-bin.000002 pos: 4 +DELIMITER ; +# End of log file +ROLLBACK /* added by mysqlbinlog */; +/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/; +# +# mysqlbinlog output +# --base64-output = decode-rows +# --rewrite-db = test1->new_test1 +# --rewrite-db = test3->new_test3 +# --read-from-remote-server +# +/*!40019 SET @@session.max_insert_delayed_threads=0*/; +/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/; +DELIMITER /*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Start: binlog v 4, server v #.##.## created 010909 4:46:40 at startup +ROLLBACK/*!*/; +# at # +use new_test1/*!*/; +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +SET @@session.pseudo_thread_id=#/*!*/; +SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=1, @@session.unique_checks=1, @@session.autocommit=1/*!*/; +SET @@session.sql_mode=0/*!*/; +SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/; +/*!\C latin1 *//*!*/; +SET @@session.character_set_client=8,@@session.collation_connection=8,@@session.collation_server=8/*!*/; +SET @@session.lc_time_names=0/*!*/; +SET @@session.collation_database=DEFAULT/*!*/; +CREATE TABLE t1 (a INT, b INT) +/*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +BEGIN +/*!*/; +# at # +# at # +#010909 4:46:40 server id # end_log_pos # Table_map: `new_test1`.`t1` mapped to number # +#010909 4:46:40 server id # end_log_pos # Write_rows: table id # flags: STMT_END_F +### INSERT INTO new_test1.t1 +### SET +### @1=1 /* INT meta=0 nullable=1 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +### INSERT INTO new_test1.t1 +### SET +### @1=2 /* INT meta=0 nullable=1 is_null=0 */ +### @2=2 /* INT meta=0 nullable=1 is_null=0 */ +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +COMMIT +/*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +use test2/*!*/; +SET TIMESTAMP=1000000000/*!*/; +CREATE TABLE t2 (a INT) +/*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +BEGIN +/*!*/; +# at # +# at # +#010909 4:46:40 server id # end_log_pos # Table_map: `test2`.`t2` mapped to number # +#010909 4:46:40 server id # end_log_pos # Write_rows: table id # flags: STMT_END_F +### INSERT INTO test2.t2 +### SET +### @1=1 /* INT meta=0 nullable=1 is_null=0 */ +### INSERT INTO test2.t2 +### SET +### @1=2 /* INT meta=0 nullable=1 is_null=0 */ +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +COMMIT +/*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +BEGIN +/*!*/; +# at # +# at # +#010909 4:46:40 server id # end_log_pos # Table_map: `new_test1`.`t1` mapped to number # +#010909 4:46:40 server id # end_log_pos # Delete_rows: table id # flags: STMT_END_F +### DELETE FROM new_test1.t1 +### WHERE +### @1=1 /* INT meta=0 nullable=1 is_null=0 */ +### @2=1 /* INT meta=0 nullable=1 is_null=0 */ +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +COMMIT +/*!*/; +# at # +use new_test3/*!*/; +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +CREATE TABLE t3 (a INT) +/*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +BEGIN +/*!*/; +# at # +# at # +#010909 4:46:40 server id # end_log_pos # Table_map: `new_test3`.`t3` mapped to number # +#010909 4:46:40 server id # end_log_pos # Write_rows: table id # flags: STMT_END_F +### INSERT INTO new_test3.t3 +### SET +### @1=1 /* INT meta=0 nullable=1 is_null=0 */ +### INSERT INTO new_test3.t3 +### SET +### @1=2 /* INT meta=0 nullable=1 is_null=0 */ +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +COMMIT +/*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +BEGIN +/*!*/; +# at # +# at # +#010909 4:46:40 server id # end_log_pos # Table_map: `new_test1`.`t1` mapped to number # +#010909 4:46:40 server id # end_log_pos # Write_rows: table id # flags: STMT_END_F +### INSERT INTO new_test1.t1 +### SET +### @1=3 /* INT meta=0 nullable=1 is_null=0 */ +### @2=3 /* INT meta=0 nullable=1 is_null=0 */ +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +COMMIT +/*!*/; +# at # +use new_test1/*!*/; +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +BEGIN +/*!*/; +# at # +# at # +#010909 4:46:40 server id # end_log_pos # Table_map: `new_test1`.`t1` mapped to number # +#010909 4:46:40 server id # end_log_pos # Write_rows: table id # flags: STMT_END_F +### INSERT INTO new_test1.t1 +### SET +### @1=2 /* INT meta=0 nullable=1 is_null=0 */ +### @2=2 /* INT meta=0 nullable=1 is_null=0 */ +### INSERT INTO new_test1.t1 +### SET +### @1=3 /* INT meta=0 nullable=1 is_null=0 */ +### @2=3 /* INT meta=0 nullable=1 is_null=0 */ +### INSERT INTO new_test1.t1 +### SET +### @1=4 /* INT meta=0 nullable=1 is_null=0 */ +### @2=4 /* INT meta=0 nullable=1 is_null=0 */ +### INSERT INTO new_test1.t1 +### SET +### @1=5 /* INT meta=0 nullable=1 is_null=0 */ +### @2=5 /* INT meta=0 nullable=1 is_null=0 */ +### INSERT INTO new_test1.t1 +### SET +### @1=6 /* INT meta=0 nullable=1 is_null=0 */ +### @2=6 /* INT meta=0 nullable=1 is_null=0 */ +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +COMMIT +/*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +BEGIN +/*!*/; +# at # +# at # +#010909 4:46:40 server id # end_log_pos # Table_map: `new_test3`.`t3` mapped to number # +#010909 4:46:40 server id # end_log_pos # Delete_rows: table id # flags: STMT_END_F +### DELETE FROM new_test3.t3 +### WHERE +### @1=1 /* INT meta=0 nullable=1 is_null=0 */ +# at # +#010909 4:46:40 server id # end_log_pos # Query thread_id=# exec_time=# error_code=0 +SET TIMESTAMP=1000000000/*!*/; +COMMIT +/*!*/; +# at # +#010909 4:46:40 server id # end_log_pos # Rotate to master-bin.000002 pos: 4 +DELIMITER ; +# End of log file +ROLLBACK /* added by mysqlbinlog */; +/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/; +DROP DATABASE test1; +DROP DATABASE test2; +DROP DATABASE test3; diff --git a/mysql-test/suite/binlog/t/binlog_row_mysqlbinlog_options.test b/mysql-test/suite/binlog/t/binlog_row_mysqlbinlog_options.test new file mode 100644 index 00000000000..c1756b22eab --- /dev/null +++ b/mysql-test/suite/binlog/t/binlog_row_mysqlbinlog_options.test @@ -0,0 +1,78 @@ +--source include/have_log_bin.inc +--source include/have_binlog_format_row.inc + +# +# MWL36: Add a mysqlbinlog option to change the used database +# (Adding --rewrite-db option) +# +--disable_warnings +DROP DATABASE IF EXISTS test1; +DROP DATABASE IF EXISTS test2; +DROP DATABASE IF EXISTS test3; +--enable_warnings + +# For SBR --rewrite-db affects only default database and doesn't affect +# a query (specifically CREATE DATABASE) itself. Hence (for testing +# purpose) we start binary logging after all databases have been created. + +CREATE DATABASE test1; +CREATE DATABASE test2; +CREATE DATABASE test3; + +# Fix timestamp to avoid varying results. +SET timestamp=1000000000; + +# Delete all existing binary logs. +RESET MASTER; + +# Whe'll call mysqlbinlog with two rewrite rules: +# --rewrite-db="test1->new_test1" +# --rewrite-db="test3->new_test3" + +USE test1; +CREATE TABLE t1 (a INT, b INT); +INSERT INTO t1 VALUES (1,1),(2,2); + +USE test2; +CREATE TABLE t2 (a INT); +INSERT INTO t2 VALUES (1),(2); + +DELETE FROM test1.t1 WHERE a=1; + +USE test3; +CREATE TABLE t3 (a INT); +INSERT INTO t3 VALUES (1),(2); +INSERT INTO test1.t1 VALUES (3,3); + +USE test1; +LOAD DATA INFILE '../../std_data/loaddata7.dat' INTO TABLE t1
+ FIELDS TERMINATED BY ',' LINES TERMINATED BY '\r\n';
+DELETE FROM test3.t3 WHERE a=1; + +flush logs; + +--echo # +--echo # mysqlbinlog output +--echo # --base64-output = decode-rows +--echo # --rewrite-db = test1->new_test1 +--echo # --rewrite-db = test3->new_test3 +--echo # + +let $MYSQLD_DATADIR= `select @@datadir`; +--replace_regex /server id [0-9]*/server id #/ /server v [^ ]*/server v #.##.##/ /exec_time=[0-9]*/exec_time=#/ /thread_id=[0-9]*/thread_id=#/ /table id [0-9]*/table id #/ /mapped to number [0-9]*/mapped to number #/ /end_log_pos [0-9]*/end_log_pos #/ /# at [0-9]*/# at #/ +--exec $MYSQL_BINLOG --base64-output=decode-rows --rewrite-db="test1->new_test1" --rewrite-db="test3->new_test3" -v -v $MYSQLD_DATADIR/master-bin.000001 + +--echo # +--echo # mysqlbinlog output +--echo # --base64-output = decode-rows +--echo # --rewrite-db = test1->new_test1 +--echo # --rewrite-db = test3->new_test3 +--echo # --read-from-remote-server +--echo # + +--replace_regex /server id [0-9]*/server id #/ /server v [^ ]*/server v #.##.##/ /exec_time=[0-9]*/exec_time=#/ /thread_id=[0-9]*/thread_id=#/ /table id [0-9]*/table id #/ /mapped to number [0-9]*/mapped to number #/ /end_log_pos [0-9]*/end_log_pos #/ /# at [0-9]*/# at #/ +--exec $MYSQL_BINLOG --base64-output=decode-rows --rewrite-db="test1->new_test1" --rewrite-db="test3->new_test3" -v -v --read-from-remote-server --user=root --host=localhost --port=$MASTER_MYPORT master-bin.000001 + +DROP DATABASE test1; +DROP DATABASE test2; +DROP DATABASE test3; diff --git a/sql/log_event.cc b/sql/log_event.cc index 7024c799609..2c7df76fdfe 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -1121,7 +1121,7 @@ failed my_b_read")); goto err; } if ((res= read_log_event(buf, data_len, &error, description_event))) - res->register_temp_buf(buf); + res->register_temp_buf(buf, TRUE); err: UNLOCK_MUTEX; @@ -8005,6 +8005,111 @@ Table_map_log_event::~Table_map_log_event() my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); } + +#ifdef MYSQL_CLIENT + +/* + Rewrite database name for the event to name specified by new_db + SYNOPSIS + new_db Database name to change to + new_len Length + desc Event describing binlog that we're writing to. + + DESCRIPTION + Reset db name. This function assumes that temp_buf member contains event + representation taken from a binary log. It resets m_dbnam and m_dblen and + rewrites temp_buf with new db name. + + RETURN + 0 - Success + other - Error +*/ + +int Table_map_log_event::rewrite_db(const char* new_db, size_t new_len, + const Format_description_log_event* desc) +{ + DBUG_ENTER("Table_map_log_event::rewrite_db"); + DBUG_ASSERT(temp_buf); + + uint header_len= min(desc->common_header_len, + LOG_EVENT_MINIMAL_HEADER_LEN) + TABLE_MAP_HEADER_LEN; + int len_diff; + + if (!(len_diff= new_len - m_dblen)) + { + memcpy((void*) (temp_buf + header_len + 1), new_db, m_dblen + 1); + memcpy((void*) m_dbnam, new_db, m_dblen + 1); + DBUG_RETURN(0); + } + + // Create new temp_buf + ulong event_cur_len= uint4korr(temp_buf + EVENT_LEN_OFFSET); + ulong event_new_len= event_cur_len + len_diff; + char* new_temp_buf= (char*) my_malloc(event_new_len, MYF(MY_WME)); + + if (!new_temp_buf) + { + sql_print_error("Table_map_log_event::rewrite_db: " + "failed to allocate new temp_buf (%d bytes required)", + event_new_len); + DBUG_RETURN(-1); + } + + // Rewrite temp_buf + char* ptr= new_temp_buf; + ulong cnt= 0; + + // Copy header and change event length + memcpy(ptr, temp_buf, header_len); + int4store(ptr + EVENT_LEN_OFFSET, event_new_len); + ptr += header_len; + cnt += header_len; + + // Write new db name length and new name + *ptr++ = new_len; + memcpy(ptr, new_db, new_len + 1); + ptr += new_len + 1; + cnt += m_dblen + 2; + + // Copy rest part + memcpy(ptr, temp_buf + cnt, event_cur_len - cnt); + + // Reregister temp buf + free_temp_buf(); + register_temp_buf(new_temp_buf, TRUE); + + // Reset m_dbnam and m_dblen members + m_dblen= new_len; + + // m_dbnam resides in m_memory together with m_tblnam and m_coltype + uchar* memory= m_memory; + char const* tblnam= m_tblnam; + uchar* coltype= m_coltype; + + m_memory= (uchar*) my_multi_malloc(MYF(MY_WME), + &m_dbnam, (uint) m_dblen + 1, + &m_tblnam, (uint) m_tbllen + 1, + &m_coltype, (uint) m_colcnt, + NullS); + + if (!m_memory) + { + sql_print_error("Table_map_log_event::rewrite_db: " + "failed to allocate new m_memory (%d + %d + %d bytes required)", + m_dblen + 1, m_tbllen + 1, m_colcnt); + DBUG_RETURN(-1); + } + + memcpy((void*)m_dbnam, new_db, m_dblen + 1); + memcpy((void*)m_tblnam, tblnam, m_tbllen + 1); + memcpy(m_coltype, coltype, m_colcnt); + + my_free(memory, MYF(MY_WME)); + DBUG_RETURN(0); +} +#endif /* MYSQL_CLIENT */ + + /* Return value is an error code, one of: diff --git a/sql/log_event.h b/sql/log_event.h index 13ab4330415..41451a618a7 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -873,6 +873,13 @@ public: event's type, and its content is distributed in the event-specific fields. */ char *temp_buf; + + /* + TRUE <=> this event 'owns' temp_buf and should call my_free() when done + with it + */ + bool event_owns_temp_buf; + /* Timestamp on the master(for debugging and replication of NOW()/TIMESTAMP). It is important for queries and LOAD DATA @@ -1014,12 +1021,17 @@ public: Log_event(const char* buf, const Format_description_log_event *description_event); virtual ~Log_event() { free_temp_buf();} - void register_temp_buf(char* buf) { temp_buf = buf; } + void register_temp_buf(char* buf, bool must_free) + { + temp_buf= buf; + event_owns_temp_buf= must_free; + } void free_temp_buf() { if (temp_buf) { - my_free(temp_buf, MYF(0)); + if (event_owns_temp_buf) + my_free(temp_buf, MYF(0)); temp_buf = 0; } } @@ -3310,6 +3322,8 @@ public: ulong get_table_id() const { return m_table_id; } const char *get_table_name() const { return m_tblnam; } const char *get_db_name() const { return m_dbnam; } + int rewrite_db(const char* new_name, size_t new_name_len, + const Format_description_log_event*); #endif virtual Log_event_type get_type_code() { return TABLE_MAP_EVENT; } diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index a277ad59841..fed03e31f9d 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -92,12 +92,16 @@ extern MYSQL_PLUGIN_IMPORT const char *primary_key_name; #include "unireg.h" void init_sql_alloc(MEM_ROOT *root, uint block_size, uint pre_alloc_size); +#endif // MYSQL_CLIENT + void *sql_alloc(size_t); void *sql_calloc(size_t); char *sql_strdup(const char *str); char *sql_strmake(const char *str, size_t len); void *sql_memdup(const void * ptr, size_t size); void sql_element_free(void *ptr); + +#ifndef MYSQL_CLIENT char *sql_strmake_with_convert(const char *str, size_t arg_length, CHARSET_INFO *from_cs, size_t max_res_length, diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 59008bed7ee..7faf3b2d426 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -8245,6 +8245,7 @@ mysqld_get_one_option(int optid, } case (int)OPT_REPLICATE_REWRITE_DB: { + /* See also OPT_REWRITE_DB handling in client/mysqlbinlog.cc */ char* key = argument,*p, *val; if (!(p= strstr(argument, "->"))) diff --git a/sql/rpl_filter.cc b/sql/rpl_filter.cc index 68272c58bb1..55dc71e3405 100644 --- a/sql/rpl_filter.cc +++ b/sql/rpl_filter.cc @@ -45,6 +45,7 @@ Rpl_filter::~Rpl_filter() } +#ifndef MYSQL_CLIENT /* Returns true if table should be logged/replicated @@ -129,6 +130,7 @@ Rpl_filter::tables_ok(const char* db, TABLE_LIST* tables) !do_table_inited && !wild_do_table_inited); } +#endif /* Checks whether a db matches some do_db and ignore_db rules @@ -514,6 +516,13 @@ Rpl_filter::get_wild_ignore_table(String* str) } +bool +Rpl_filter::rewrite_db_is_empty() +{ + return rewrite_db.is_empty(); +} + + const char* Rpl_filter::get_rewrite_db(const char* db, size_t *new_len) { diff --git a/sql/rpl_filter.h b/sql/rpl_filter.h index ff7e4081bb1..88951600e56 100644 --- a/sql/rpl_filter.h +++ b/sql/rpl_filter.h @@ -42,7 +42,9 @@ public: /* Checks - returns true if ok to replicate/log */ - bool tables_ok(const char* db, TABLE_LIST* tables); +#ifndef MYSQL_CLIENT + bool tables_ok(const char* db, TABLE_LIST *tables); +#endif bool db_ok(const char* db); bool db_ok_with_wild_table(const char *db); @@ -69,6 +71,7 @@ public: void get_wild_do_table(String* str); void get_wild_ignore_table(String* str); + bool rewrite_db_is_empty(); const char* get_rewrite_db(const char* db, size_t *new_len); I_List<i_string>* get_do_db(); diff --git a/sql/sql_string.cc b/sql/sql_string.cc index bdecabd782b..db3d07a28a4 100644 --- a/sql/sql_string.cc +++ b/sql/sql_string.cc @@ -26,9 +26,11 @@ #ifdef HAVE_FCONVERT #include <floatingpoint.h> #endif - #include "sql_string.h" +#ifdef MYSQL_CLIENT +#error Attempt to use server-side sql_string on client. Use client/sql_string.cc +#endif /***************************************************************************** ** String functions *****************************************************************************/ diff --git a/sql/sql_string.h b/sql/sql_string.h index 9e22f7b6a27..a9df0dc2620 100644 --- a/sql/sql_string.h +++ b/sql/sql_string.h @@ -1,3 +1,5 @@ +#ifndef MYSQL_SQL_STRING_H_INCLUDED +#define MYSQL_SQL_STRING_H_INCLUDED /* Copyright (C) 2000 MySQL AB This program is free software; you can redistribute it and/or modify @@ -23,6 +25,10 @@ #define NOT_FIXED_DEC 31 #endif +#ifdef MYSQL_CLIENT +#error Attempt to use server-side sql_string on client. Use client/sql_string.h +#endif + class String; int sortcmp(const String *a,const String *b, CHARSET_INFO *cs); String *copy_if_not_alloced(String *a,String *b,uint32 arg_length); @@ -394,3 +400,5 @@ static inline bool check_if_only_end_space(CHARSET_INFO *cs, char *str, { return str+ cs->cset->scan(cs, str, end, MY_SEQ_SPACES) == end; } + +#endif // MYSQL_SQL_STRING_H_INCLUDED diff --git a/sql/thr_malloc.cc b/sql/thr_malloc.cc index 0764fe8be33..83c4a8ee2a0 100644 --- a/sql/thr_malloc.cc +++ b/sql/thr_malloc.cc @@ -59,11 +59,13 @@ void init_sql_alloc(MEM_ROOT *mem_root, uint block_size, uint pre_alloc) } +#ifndef MYSQL_CLIENT void *sql_alloc(size_t Size) { MEM_ROOT *root= *my_pthread_getspecific_ptr(MEM_ROOT**,THR_MALLOC); return alloc_root(root,Size); } +#endif void *sql_calloc(size_t size) |