diff options
-rw-r--r-- | client/mysqlbinlog.cc | 21 | ||||
-rw-r--r-- | include/my_sys.h | 4 | ||||
-rw-r--r-- | mysql-test/suite/binlog/r/binlog_base64_flag.result | 19 | ||||
-rw-r--r-- | mysql-test/suite/binlog/r/binlog_mysqlbinlog_row_frag.result | 24 | ||||
-rw-r--r-- | mysql-test/suite/binlog/t/binlog_base64_flag.test | 22 | ||||
-rw-r--r-- | mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test | 45 | ||||
-rw-r--r-- | mysys/mf_iocache2.c | 61 | ||||
-rw-r--r-- | sql/item_func.cc | 2 | ||||
-rw-r--r-- | sql/item_func.h | 4 | ||||
-rw-r--r-- | sql/log_event.cc | 169 | ||||
-rw-r--r-- | sql/log_event.h | 13 | ||||
-rw-r--r-- | sql/log_event_old.cc | 16 | ||||
-rw-r--r-- | sql/sql_binlog.cc | 94 | ||||
-rw-r--r-- | sql/sql_lex.h | 4 | ||||
-rw-r--r-- | sql/sql_yacc.yy | 11 | ||||
-rw-r--r-- | unittest/sql/mf_iocache-t.cc | 70 |
16 files changed, 511 insertions, 68 deletions
diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc index bc13aa6c2cc..2c05bb806a9 100644 --- a/client/mysqlbinlog.cc +++ b/client/mysqlbinlog.cc @@ -72,6 +72,7 @@ ulong mysqld_net_retry_count = 10L; ulong open_files_limit; ulong opt_binlog_rows_event_max_size; ulonglong test_flags = 0; +ulong opt_binlog_rows_event_max_encoded_size= MAX_MAX_ALLOWED_PACKET; static uint opt_protocol= 0; static FILE *result_file; static char *result_file_name= 0; @@ -813,7 +814,12 @@ write_event_header_and_base64(Log_event *ev, FILE *result_file, /* Write header and base64 output to cache */ ev->print_header(head, print_event_info, FALSE); - ev->print_base64(body, print_event_info, FALSE); + + DBUG_ASSERT(print_event_info->base64_output_mode == BASE64_OUTPUT_ALWAYS); + + ev->print_base64(body, print_event_info, + print_event_info->base64_output_mode != + BASE64_OUTPUT_DECODE_ROWS); /* Read data from cache and write to result file */ if (copy_event_cache_to_file_and_reinit(head, result_file) || @@ -852,7 +858,9 @@ static bool print_base64(PRINT_EVENT_INFO *print_event_info, Log_event *ev) return 1; } ev->print(result_file, print_event_info); - return print_event_info->head_cache.error == -1; + return + print_event_info->head_cache.error == -1 || + print_event_info->body_cache.error == -1; } @@ -1472,6 +1480,15 @@ that may lead to an endless loop.", "This value must be a multiple of 256.", &opt_binlog_rows_event_max_size, &opt_binlog_rows_event_max_size, 0, GET_ULONG, REQUIRED_ARG, UINT_MAX, 256, ULONG_MAX, 0, 256, 0}, +#ifndef DBUG_OFF + {"debug-binlog-row-event-max-encoded-size", 0, + "The maximum size of base64-encoded rows-event in one BINLOG pseudo-query " + "instance. When the computed actual size exceeds the limit " + "the BINLOG's argument string is fragmented in two.", + &opt_binlog_rows_event_max_encoded_size, + &opt_binlog_rows_event_max_encoded_size, 0, + GET_ULONG, REQUIRED_ARG, UINT_MAX/4, 256, ULONG_MAX, 0, 256, 0}, +#endif {"verify-binlog-checksum", 'c', "Verify checksum binlog events.", (uchar**) &opt_verify_binlog_checksum, (uchar**) &opt_verify_binlog_checksum, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, diff --git a/include/my_sys.h b/include/my_sys.h index 110a2ee9af3..c30580a8c06 100644 --- a/include/my_sys.h +++ b/include/my_sys.h @@ -602,7 +602,9 @@ static inline size_t my_b_bytes_in_cache(const IO_CACHE *info) return *info->current_end - *info->current_pos; } -int my_b_copy_to_file(IO_CACHE *cache, FILE *file); +int my_b_copy_to_file (IO_CACHE *cache, FILE *file, size_t count); +int my_b_copy_all_to_file(IO_CACHE *cache, FILE *file); + my_off_t my_b_append_tell(IO_CACHE* info); my_off_t my_b_safe_tell(IO_CACHE* info); /* picks the correct tell() */ int my_b_pread(IO_CACHE *info, uchar *Buffer, size_t Count, my_off_t pos); diff --git a/mysql-test/suite/binlog/r/binlog_base64_flag.result b/mysql-test/suite/binlog/r/binlog_base64_flag.result index d13e13c97b0..b97cf9072fa 100644 --- a/mysql-test/suite/binlog/r/binlog_base64_flag.result +++ b/mysql-test/suite/binlog/r/binlog_base64_flag.result @@ -28,6 +28,25 @@ a 1 1 3 +DELETE FROM t1 WHERE a=3; +BINLOG ' +ODdYRw8BAAAAZgAAAGoAAAABAAQANS4xLjIzLXJjLWRlYnVnLWxvZwAAAAAAAAAAAAAAAAAAAAAA +AAAAAAAAAAAAAAAAAAA4N1hHEzgNAAgAEgAEBAQEEgAAUwAEGggAAAAICAgC +'; +SET @binlog_fragment_0=' +TFtYRxMBAAAAKQAAAH8BAAAAABAAAAAAAAAABHRlc3QAAnQxAAEDAAE= +TFtYRxcBAAAAIgAAAKEBAAAQABAAAAAAAAEAAf/+AwAAAA== +'; +SET @binlog_fragment_1=''; +BINLOG @binlog_fragment_0, @binlog_fragment_1; +select * from t1; +a +1 +1 +3 +SELECT @binlog_fragment_0, @binlog_fragment_1 as 'NULL','NULL'; +@binlog_fragment_0 NULL NULL +NULL NULL NULL ==== Test --base64-output=never on a binlog with row events ==== /*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/; /*!40019 SET @@session.max_insert_delayed_threads=0*/; diff --git a/mysql-test/suite/binlog/r/binlog_mysqlbinlog_row_frag.result b/mysql-test/suite/binlog/r/binlog_mysqlbinlog_row_frag.result new file mode 100644 index 00000000000..041be5ed09f --- /dev/null +++ b/mysql-test/suite/binlog/r/binlog_mysqlbinlog_row_frag.result @@ -0,0 +1,24 @@ +CREATE TABLE t (a TEXT); +RESET MASTER; +INSERT INTO t SET a=repeat('a', 1024); +SELECT a from t into @a; +FLUSH LOGS; +DELETE FROM t; +FOUND /BINLOG @binlog_fragment_0, @binlog_fragment_1/ in mysqlbinlog.sql +SELECT a LIKE @a as 'true' FROM t; +true +1 +BINLOG number-of-fragments must be exactly two +BINLOG @binlog_fragment; +ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near '' at line 1 +BINLOG @binlog_fragment, @binlog_fragment, @binlog_fragment; +ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near ' @binlog_fragment' at line 1 +SET @binlog_fragment_0='012345'; +SET @binlog_fragment_1='012345'; +BINLOG @binlog_fragment_0, @binlog_fragment_1; +ERROR 42000: You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use +SET @binlog_fragment_0='012345'; +BINLOG @binlog_fragment_0, @binlog_fragment_not_exist; +ERROR 42000: Incorrect argument type to variable 'binlog_fragment_not_exist' +# Cleanup +DROP TABLE t; diff --git a/mysql-test/suite/binlog/t/binlog_base64_flag.test b/mysql-test/suite/binlog/t/binlog_base64_flag.test index f8333315088..575a7307665 100644 --- a/mysql-test/suite/binlog/t/binlog_base64_flag.test +++ b/mysql-test/suite/binlog/t/binlog_base64_flag.test @@ -67,6 +67,28 @@ TFtYRxcBAAAAIgAAAKEBAAAQABAAAAAAAAEAAf/+AwAAAA== # The above line should succeed and 3 should be in the table select * from t1; +# The same as above with one-fragment BINLOG to prove +# equivalency with the fragmented BINLOG @frag_0, @frag_1. +DELETE FROM t1 WHERE a=3; +# This is a binlog statement containing a Format_description_log_event +# from the same version as the Table_map and Write_rows_log_event. +BINLOG ' +ODdYRw8BAAAAZgAAAGoAAAABAAQANS4xLjIzLXJjLWRlYnVnLWxvZwAAAAAAAAAAAAAAAAAAAAAA +AAAAAAAAAAAAAAAAAAA4N1hHEzgNAAgAEgAEBAQEEgAAUwAEGggAAAAICAgC +'; + +# This is a Table_map_log_event+Write_rows_log_event corresponding to: +# INSERT INTO TABLE test.t1 VALUES (3) +SET @binlog_fragment_0=' +TFtYRxMBAAAAKQAAAH8BAAAAABAAAAAAAAAABHRlc3QAAnQxAAEDAAE= +TFtYRxcBAAAAIgAAAKEBAAAQABAAAAAAAAEAAf/+AwAAAA== +'; +SET @binlog_fragment_1=''; +BINLOG @binlog_fragment_0, @binlog_fragment_1; +# The above line should succeed and 3 should be in the table: +select * from t1; +# show "one-shot" feature of binlog_fragment variables +SELECT @binlog_fragment_0, @binlog_fragment_1 as 'NULL','NULL'; # Test that mysqlbinlog stops with an error message when the # --base64-output=never flag is used on a binlog with base64 events. diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test new file mode 100644 index 00000000000..f0317ef1219 --- /dev/null +++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test @@ -0,0 +1,45 @@ +--source include/have_debug.inc +--source include/have_binlog_format_row.inc + +--let $MYSQLD_DATADIR= `select @@datadir` + +CREATE TABLE t (a TEXT); +# events of interest are guaranteed to stay in 000001 log +RESET MASTER; +--eval INSERT INTO t SET a=repeat('a', 1024) +SELECT a from t into @a; +FLUSH LOGS; +DELETE FROM t; + +--exec $MYSQL_BINLOG --debug-binlog-row-event-max-encoded-size=256 $MYSQLD_DATADIR/master-bin.000001 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql + +--let SEARCH_PATTERN= BINLOG @binlog_fragment_0, @binlog_fragment_1 +--let SEARCH_FILE= $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql +--source include/search_pattern_in_file.inc + +--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql + +SELECT a LIKE @a as 'true' FROM t; + +# improper syntax error +--echo BINLOG number-of-fragments must be exactly two +--error ER_PARSE_ERROR +BINLOG @binlog_fragment; +--error ER_PARSE_ERROR +BINLOG @binlog_fragment, @binlog_fragment, @binlog_fragment; + +# corrupted fragments error check (to the expected error code notice, +# the same error code occurs in a similar unfragmented case) +SET @binlog_fragment_0='012345'; +SET @binlog_fragment_1='012345'; +--error ER_SYNTAX_ERROR +BINLOG @binlog_fragment_0, @binlog_fragment_1; + +# Not existing fragment is not allowed +SET @binlog_fragment_0='012345'; +--error ER_WRONG_TYPE_FOR_VAR +BINLOG @binlog_fragment_0, @binlog_fragment_not_exist; + +--echo # Cleanup +--remove_file $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql +DROP TABLE t; diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c index fa3b6e672d7..f3877331664 100644 --- a/mysys/mf_iocache2.c +++ b/mysys/mf_iocache2.c @@ -23,51 +23,56 @@ #include <stdarg.h> #include <m_ctype.h> -/* - Copy contents of an IO_CACHE to a file. - - SYNOPSIS - my_b_copy_to_file() - cache IO_CACHE to copy from - file File to copy to - - DESCRIPTION - Copy the contents of the cache to the file. The cache will be - re-inited to a read cache and will read from the beginning of the - cache. - - If a failure to write fully occurs, the cache is only copied - partially. +/** + Copy the cache to the file. Copying can be constrained to @c count + number of bytes when the parameter is less than SIZE_T_MAX. The + cache will be optionally re-inited to a read cache and will read + from the beginning of the cache. If a failure to write fully + occurs, the cache is only copied partially. TODO - Make this function solid by handling partial reads from the cache - in a correct manner: it should be atomic. - - RETURN VALUE - 0 All OK - 1 An error occurred + Make this function solid by handling partial reads from the cache + in a correct manner: it should be atomic. + + @param cache IO_CACHE to copy from + @param file File to copy to + @param count the copied size or the max of the type + when the whole cache is to be copied. + @return + 0 All OK + 1 An error occurred */ int -my_b_copy_to_file(IO_CACHE *cache, FILE *file) +my_b_copy_to_file(IO_CACHE *cache, FILE *file, + size_t count) { - size_t bytes_in_cache; + size_t curr_write, bytes_in_cache; DBUG_ENTER("my_b_copy_to_file"); - /* Reinit the cache to read from the beginning of the cache */ - if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) - DBUG_RETURN(1); bytes_in_cache= my_b_bytes_in_cache(cache); do { - if (my_fwrite(file, cache->read_pos, bytes_in_cache, + curr_write= MY_MIN(bytes_in_cache, count); + if (my_fwrite(file, cache->read_pos, curr_write, MYF(MY_WME | MY_NABP)) == (size_t) -1) DBUG_RETURN(1); - } while ((bytes_in_cache= my_b_fill(cache))); + + cache->read_pos += curr_write; + count -= curr_write; + } while (count && (bytes_in_cache= my_b_fill(cache))); if(cache->error == -1) DBUG_RETURN(1); DBUG_RETURN(0); } +int my_b_copy_all_to_file(IO_CACHE *cache, FILE *file) +{ + DBUG_ENTER("my_b_copy_all_to_file"); + /* Reinit the cache to read from the beginning of the cache */ + if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) + DBUG_RETURN(1); + DBUG_RETURN(my_b_copy_to_file(cache, file, SIZE_T_MAX)); +} my_off_t my_b_append_tell(IO_CACHE* info) { diff --git a/sql/item_func.cc b/sql/item_func.cc index 8b1c7b3bc61..169eb76d802 100644 --- a/sql/item_func.cc +++ b/sql/item_func.cc @@ -4828,7 +4828,7 @@ bool Item_func_set_user_var::register_field_in_bitmap(uchar *arg) true failure */ -static bool +bool update_hash(user_var_entry *entry, bool set_null, void *ptr, uint length, Item_result type, CHARSET_INFO *cs, bool unsigned_arg) diff --git a/sql/item_func.h b/sql/item_func.h index 2c0e3a62f6a..e3eab02f213 100644 --- a/sql/item_func.h +++ b/sql/item_func.h @@ -2283,4 +2283,8 @@ bool eval_const_cond(COND *cond); extern bool volatile mqh_used; +bool update_hash(user_var_entry *entry, bool set_null, void *ptr, uint length, + Item_result type, CHARSET_INFO *cs, + bool unsigned_arg); + #endif /* ITEM_FUNC_INCLUDED */ diff --git a/sql/log_event.cc b/sql/log_event.cc index 883f1863ac4..1369ba2d687 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -2783,9 +2783,23 @@ void free_table_map_log_event(Table_map_log_event *event) delete event; } +/** + Encode the event, optionally per 'do_print_encoded' arg store the + result into the argument cache; optionally per event_info's + 'verbose' print into the cache a verbose representation of the event. + Note, no extra wrapping is done to the being io-cached data, like + to producing a BINLOG query. It's left for a routine that extracts from + the cache. + + @param file pointer to IO_CACHE + @param print_event_info pointer to print_event_info specializing + what out of and how to print the event + @param do_print_encoded whether to store base64-encoded event + into @file. +*/ void Log_event::print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, - bool more) + bool do_print_encoded) { const uchar *ptr= (const uchar *)temp_buf; uint32 size= uint4korr(ptr + EVENT_LEN_OFFSET); @@ -2804,17 +2818,9 @@ void Log_event::print_base64(IO_CACHE* file, DBUG_ASSERT(0); } - if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS) - { - if (my_b_tell(file) == 0) - my_b_write_string(file, "\nBINLOG '\n"); - + if (do_print_encoded) my_b_printf(file, "%s\n", tmp_str); - if (!more) - my_b_printf(file, "'%s\n", print_event_info->delimiter); - } - if (print_event_info->verbose) { Rows_log_event *ev= NULL; @@ -4851,9 +4857,17 @@ void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER && !print_event_info->short_form) { - if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS) + /* BINLOG is matched with the delimiter below on the same level */ + bool do_print_encoded= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS; + if (do_print_encoded) my_b_printf(&cache, "BINLOG '\n"); - print_base64(&cache, print_event_info, FALSE); + + print_base64(&cache, print_event_info, do_print_encoded); + + if (do_print_encoded) + my_b_printf(&cache, "'%s\n", print_event_info->delimiter); + print_event_info->printed_fd_event= TRUE; } DBUG_VOID_RETURN; @@ -10491,12 +10505,128 @@ void Rows_log_event::pack_info(Protocol *protocol) #endif #ifdef MYSQL_CLIENT +/** + Print an event "body" cache to @c file possibly in two fragments. + Each fragement is optionally per @c do_wrap to produce an SQL statement. + + @param file a file to print to + @param body the "body" IO_CACHE of event + @param do_wrap whether to wrap base64-encoded strings with + SQL cover. + @param delimiter delimiter string + + The function signals on any error through setting @c body->error to -1. +*/ +void copy_cache_to_file_wrapped(FILE *file, + IO_CACHE *body, + bool do_wrap, + const char *delimiter) +{ + const char str_binlog[]= "\nBINLOG '\n"; + const char fmt_delim[]= "'%s\n"; + const char fmt_n_delim[]= "\n'%s"; + const my_off_t cache_size= my_b_tell(body); + + if (reinit_io_cache(body, READ_CACHE, 0L, FALSE, FALSE)) + { + body->error= -1; + goto end; + } + + if (!do_wrap) + { + my_b_copy_to_file(body, file, SIZE_T_MAX); + } + else if (4 + sizeof(str_binlog) + cache_size + sizeof(fmt_delim) > + opt_binlog_rows_event_max_encoded_size) + { + /* + 2 fragments can always represent near 1GB row-based + base64-encoded event as two strings each of size less than + max(max_allowed_packet). Greater number of fragments does not + save from potential need to tweak (increase) @@max_allowed_packet + before to process the fragments. So 2 is safe and enough. + + Split the big query when its packet size's estimation exceeds a + limit. The estimate includes the maximum packet header + contribution of non-compressed packet. + */ + const char fmt_frag[]= "\nSET @binlog_fragment_%d ='\n"; + + my_fprintf(file, fmt_frag, 0); + if (my_b_copy_to_file(body, file, cache_size/2 + 1)) + { + body->error= -1; + goto end; + } + my_fprintf(file, fmt_n_delim, delimiter); + + my_fprintf(file, fmt_frag, 1); + if (my_b_copy_to_file(body, file, SIZE_T_MAX)) + { + body->error= -1; + goto end; + } + my_fprintf(file, fmt_delim, delimiter); + + my_fprintf(file, "BINLOG @binlog_fragment_0, @binlog_fragment_1%s\n", + delimiter); + } + else + { + my_fprintf(file, str_binlog); + if (my_b_copy_to_file(body, file, SIZE_T_MAX)) + { + body->error= -1; + goto end; + } + my_fprintf(file, fmt_delim, delimiter); + } + reinit_io_cache(body, WRITE_CACHE, 0, FALSE, TRUE); + +end: + return; +} + +/** + The function invokes base64 encoder to run on the current + event string and store the result into two caches. + When the event ends the current statement the caches are is copied into + the argument file. + Copying is also concerned how to wrap the event, specifically to produce + a valid SQL syntax. + When the encoded data size is within max(MAX_ALLOWED_PACKET) + a regular BINLOG query is composed. Otherwise it is build as fragmented + + SET @binlog_fragment_0='...'; + SET @binlog_fragment_1='...'; + BINLOG @binlog_fragment_0, @binlog_fragment_1; + + where fragments are represented by a pair of indexed user + "one shot" variables. + + @note + If any changes made don't forget to duplicate them to + Old_rows_log_event as long as it's supported. + + @param file pointer to IO_CACHE + @param print_event_info pointer to print_event_info specializing + what out of and how to print the event + @param name the name of a table that the event operates on + + The function signals on any error of cache access through setting + that cache's @c error to -1. +*/ void Rows_log_event::print_helper(FILE *file, PRINT_EVENT_INFO *print_event_info, char const *const name) { IO_CACHE *const head= &print_event_info->head_cache; IO_CACHE *const body= &print_event_info->body_cache; + bool do_print_encoded= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS && + !print_event_info->short_form; + if (!print_event_info->short_form) { bool const last_stmt_event= get_flags(STMT_END_F); @@ -10504,13 +10634,18 @@ void Rows_log_event::print_helper(FILE *file, my_b_printf(head, "\t%s: table id %lu%s\n", name, m_table_id, last_stmt_event ? " flags: STMT_END_F" : ""); - print_base64(body, print_event_info, !last_stmt_event); + print_base64(body, print_event_info, do_print_encoded); } if (get_flags(STMT_END_F)) { - copy_event_cache_to_file_and_reinit(head, file); - copy_event_cache_to_file_and_reinit(body, file); + if (copy_event_cache_to_file_and_reinit(head, file)) + { + head->error= -1; + return; + } + copy_cache_to_file_wrapped(file, body, do_print_encoded, + print_event_info->delimiter); } } #endif @@ -11379,7 +11514,9 @@ void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) m_dbnam, m_tblnam, m_table_id, ((m_flags & TM_BIT_HAS_TRIGGERS_F) ? " (has triggers)" : "")); - print_base64(&print_event_info->body_cache, print_event_info, TRUE); + print_base64(&print_event_info->body_cache, print_event_info, + print_event_info->base64_output_mode != + BASE64_OUTPUT_DECODE_ROWS); copy_event_cache_to_file_and_reinit(&print_event_info->head_cache, file); } } diff --git a/sql/log_event.h b/sql/log_event.h index 90900f63533..446bd8cb827 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -1157,7 +1157,7 @@ public: void print_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, bool is_more); void print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, - bool is_more); + bool do_print_encoded); #endif /* read_log_event() functions read an event from a binlog or relay @@ -4891,15 +4891,22 @@ public: virtual int get_data_size() { return IGNORABLE_HEADER_LEN; } }; +#ifdef MYSQL_CLIENT +void copy_cache_to_file_wrapped(FILE *file, + PRINT_EVENT_INFO *print_event_info, + IO_CACHE *body, + bool do_wrap); +#endif static inline bool copy_event_cache_to_file_and_reinit(IO_CACHE *cache, FILE *file) { - return - my_b_copy_to_file(cache, file) || + return + my_b_copy_all_to_file(cache, file) || reinit_io_cache(cache, WRITE_CACHE, 0, FALSE, TRUE); } + #ifdef MYSQL_SERVER /***************************************************************************** diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index d2b4470bbf9..a6f2ed3f416 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -1850,12 +1850,17 @@ void Old_rows_log_event::pack_info(Protocol *protocol) #ifdef MYSQL_CLIENT +/* Method duplicates Rows_log_event's one */ void Old_rows_log_event::print_helper(FILE *file, PRINT_EVENT_INFO *print_event_info, char const *const name) { IO_CACHE *const head= &print_event_info->head_cache; IO_CACHE *const body= &print_event_info->body_cache; + bool do_print_encoded= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS && + !print_event_info->short_form; + if (!print_event_info->short_form) { bool const last_stmt_event= get_flags(STMT_END_F); @@ -1863,13 +1868,18 @@ void Old_rows_log_event::print_helper(FILE *file, my_b_printf(head, "\t%s: table id %lu%s\n", name, m_table_id, last_stmt_event ? " flags: STMT_END_F" : ""); - print_base64(body, print_event_info, !last_stmt_event); + print_base64(body, print_event_info, do_print_encoded); } if (get_flags(STMT_END_F)) { - copy_event_cache_to_file_and_reinit(head, file); - copy_event_cache_to_file_and_reinit(body, file); + if (copy_event_cache_to_file_and_reinit(head, file)) + { + head->error= -1; + return; + } + copy_cache_to_file_wrapped(file, body, do_print_encoded, + print_event_info->delimiter); } } #endif diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 91cf038907e..2e861d00f10 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -28,6 +28,65 @@ // START_EVENT_V3, // Log_event_type, // Log_event + +/** + Copy fragments into the standard placeholder thd->lex->comment.str. + + Compute the size of the (still) encoded total, + allocate and then copy fragments one after another. + The size can exceed max(max_allowed_packet) which is not a + problem as no String instance is created off this char array. + + @param thd THD handle + @return + 0 at success, + -1 otherwise. +*/ +int binlog_defragment(THD *thd) +{ + user_var_entry *entry[2]; + LEX_STRING name[2]= { thd->lex->comment, thd->lex->ident }; + + /* compute the total size */ + thd->lex->comment.str= NULL; + thd->lex->comment.length= 0; + for (uint k= 0; k < 2; k++) + { + entry[k]= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name[k].str, + name[k].length); + if (!entry[k] || entry[k]->type != STRING_RESULT) + { + my_error(ER_WRONG_TYPE_FOR_VAR, MYF(0), name[k].str); + return -1; + } + thd->lex->comment.length += entry[k]->length; + } + + thd->lex->comment.str= // to be freed by the caller + (char *) my_malloc(thd->lex->comment.length, MYF(MY_WME)); + if (!thd->lex->comment.str) + { + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); + return -1; + } + + /* fragments are merged into allocated buf while the user var:s get reset */ + size_t gathered_length= 0; + for (uint k=0; k < 2; k++) + { + memcpy(thd->lex->comment.str + gathered_length, entry[k]->value, + entry[k]->length); + gathered_length += entry[k]->length; + update_hash(entry[k], true, NULL, 0, STRING_RESULT, &my_charset_bin, 0); + } + + DBUG_ASSERT(gathered_length == thd->lex->comment.length); + + return 0; +} + + /** Execute a BINLOG statement. @@ -53,14 +112,6 @@ void mysql_client_binlog_statement(THD* thd) if (check_global_access(thd, SUPER_ACL)) DBUG_VOID_RETURN; - size_t coded_len= thd->lex->comment.length; - if (!coded_len) - { - my_error(ER_SYNTAX_ERROR, MYF(0)); - DBUG_VOID_RETURN; - } - size_t decoded_len= base64_needed_decoded_length(coded_len); - /* option_bits will be changed when applying the event. But we don't expect it be changed permanently after BINLOG statement, so backup it first. @@ -81,6 +132,8 @@ void mysql_client_binlog_statement(THD* thd) int err; Relay_log_info *rli; rpl_group_info *rgi; + char *buf= NULL; + size_t coded_len= 0, decoded_len= 0; rli= thd->rli_fake; if (!rli) @@ -102,15 +155,13 @@ void mysql_client_binlog_statement(THD* thd) rgi->thd= thd; const char *error= 0; - char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME)); Log_event *ev = 0; + my_bool is_fragmented= FALSE; /* Out of memory check */ - if (!(rli && - rli->relay_log.description_event_for_exec && - buf)) + if (!(rli && rli->relay_log.description_event_for_exec)) { my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); /* needed 1 bytes */ goto end; @@ -119,6 +170,23 @@ void mysql_client_binlog_statement(THD* thd) rli->sql_driver_thd= thd; rli->no_storage= TRUE; + if (unlikely(is_fragmented= thd->lex->comment.str && thd->lex->ident.str)) + if (binlog_defragment(thd)) + goto end; + + if (!(coded_len= thd->lex->comment.length)) + { + my_error(ER_SYNTAX_ERROR, MYF(0)); + goto end; + } + + decoded_len= base64_needed_decoded_length(coded_len); + if (!(buf= (char *) my_malloc(decoded_len, MYF(MY_WME)))) + { + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); + goto end; + } + for (char const *strptr= thd->lex->comment.str ; strptr < thd->lex->comment.str + thd->lex->comment.length ; ) { @@ -272,6 +340,8 @@ void mysql_client_binlog_statement(THD* thd) my_ok(thd); end: + if (unlikely(is_fragmented)) + my_free(thd->lex->comment.str); thd->variables.option_bits= thd_options; rgi->slave_close_thread_tables(thd); my_free(buf); diff --git a/sql/sql_lex.h b/sql/sql_lex.h index f1edc809579..edc647522d3 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -2459,6 +2459,10 @@ struct LEX: public Query_tables_list String *wild; /* Wildcard in SHOW {something} LIKE 'wild'*/ sql_exchange *exchange; select_result *result; + /** + @c the two may also hold BINLOG arguments: either comment holds a + base64-char string or both represent the BINLOG fragment user variables. + */ LEX_STRING comment, ident; LEX_USER *grant_user; XID *xid; diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index 0b85b597baf..bfaa0a60a24 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -7950,8 +7950,17 @@ binlog_base64_event: { Lex->sql_command = SQLCOM_BINLOG_BASE64_EVENT; Lex->comment= $2; + Lex->ident.str= NULL; + Lex->ident.length= 0; } - ; + | + BINLOG_SYM '@' ident_or_text ',' '@' ident_or_text + { + Lex->sql_command = SQLCOM_BINLOG_BASE64_EVENT; + Lex->comment= $3; + Lex->ident= $6; + } + ; check_view_or_table: table_or_tables table_list opt_mi_check_type diff --git a/unittest/sql/mf_iocache-t.cc b/unittest/sql/mf_iocache-t.cc index 1eef8365074..fca5ec5014d 100644 --- a/unittest/sql/mf_iocache-t.cc +++ b/unittest/sql/mf_iocache-t.cc @@ -370,10 +370,77 @@ void mdev17133() } +void mdev10963() +{ + int res; + uint n_checks= 8; + uchar buf[1024 * 512]; + uint n_frag= sizeof(buf)/(2 * CACHE_SIZE); + FILE *file; + myf my_flags= MYF(MY_WME); + const char *file_name="cache.log"; + + memset(buf, FILL, sizeof(buf)); + diag("MDEV-10963 Fragmented BINLOG query"); + + init_io_cache_encryption(); + srand((uint) time(NULL)); + + /* copying source */ + res= open_cached_file(&info, 0, 0, CACHE_SIZE, 0); + ok(res == 0, "open_cached_file" INFO_TAIL); + res= my_b_write(&info, buf, sizeof(buf)); + + ulong total_size= my_b_tell(&info); + ok(res == 0 && total_size == sizeof(buf), "cache is written"); + + /* destination */ + file= my_fopen(file_name, O_RDWR | O_TRUNC | O_CREAT, my_flags); + ok(my_fileno(file) > 0, "opened file fd = %d", my_fileno(file)); + + /* + For n_checks times verify a sequence of copying with random fragment + size ranging from zero to about the double of the cache read buffer size. + */ + for (; n_checks; n_checks--, rewind(file)) + { + // copied size is an estimate can be incremeneted to greater than total_size + ulong copied_size= 0; + + res= reinit_io_cache(&info, READ_CACHE, 0L, FALSE, FALSE); + ok(res == 0, "cache turned to read"); + + for (ulong i= 0, curr_size= 0; i < n_frag; i++, copied_size += curr_size) + { + curr_size= rand() % (2 * (total_size - copied_size) / (n_frag - i)); + + DBUG_ASSERT(curr_size <= total_size - copied_size || i == n_frag - 1); + + res= my_b_copy_to_file(&info, file, curr_size); + ok(res == 0, "%lu of the cache copied to file", curr_size); + } + /* + Regardless of total_size <> copied_size the function succeeds: + when total_size < copied_size the huge overflowed value of the last + argument is ignored because nothing already left uncopied in the cache. + */ + res= my_b_copy_to_file(&info, file, total_size - copied_size); + ok(res == 0, "%lu of the cache copied to file", total_size - copied_size); + ok(my_ftell(file, my_flags) == sizeof(buf), + "file written in %d fragments", n_frag+1); + + res= reinit_io_cache(&info, WRITE_CACHE, total_size, 0, 0); + ok(res == 0 && my_b_tell(&info) == sizeof(buf), "cache turned to write"); + } + close_cached_file(&info); + my_fclose(file, my_flags); + my_delete(file_name, MYF(MY_WME)); +} + int main(int argc __attribute__((unused)),char *argv[]) { MY_INIT(argv[0]); - plan(114); + plan(277); /* temp files with and without encryption */ encrypt_tmp_files= 1; @@ -391,6 +458,7 @@ int main(int argc __attribute__((unused)),char *argv[]) mdev14014(); mdev17133(); + mdev10963(); my_end(0); return exit_status(); |