diff options
author | Andrei Elkin <andrei.elkin@mariadb.com> | 2018-07-27 22:55:18 +0300 |
---|---|---|
committer | Andrei Elkin <andrei.elkin@mariadb.com> | 2019-01-24 20:44:50 +0200 |
commit | 5d48ea7d07b481ae3930486b4b039e1454273190 (patch) | |
tree | 86e9be451b867ad4f4df061c18d0546744e45ee0 /sql | |
parent | f9ac7032cbc4b7b9af9c8122e6ebfd91af9fbaf9 (diff) | |
download | mariadb-git-5d48ea7d07b481ae3930486b4b039e1454273190.tar.gz |
MDEV-10963 Fragmented BINLOG query
The problem was originally stated in
http://bugs.mysql.com/bug.php?id=82212
The size of an base64-encoded Rows_log_event exceeds its
vanilla byte representation in 4/3 times.
When a binlogged event size is about 1GB mysqlbinlog generates
a BINLOG query that can't be send out due to its size.
It is fixed with fragmenting the BINLOG argument C-string into
(approximate) halves when the base64 encoded event is over 1GB size.
The mysqlbinlog in such case puts out
SET @binlog_fragment_0='base64-encoded-fragment_0';
SET @binlog_fragment_1='base64-encoded-fragment_1';
BINLOG @binlog_fragment_0, @binlog_fragment_1;
to represent a big BINLOG.
For prompt memory release BINLOG handler is made to reset the BINLOG argument
user variables in the middle of processing, as if @binlog_fragment_{0,1} = NULL
is assigned.
Notice the 2 fragments are enough, though the client and server still may
need to tweak their @@max_allowed_packet to satisfy to the fragment
size (which they would have to do anyway with greater number of
fragments, should that be desired).
On the lower level the following changes are made:
Log_event::print_base64()
remains to call encoder and store the encoded data into a cache but
now *without* doing any formatting. The latter is left for time
when the cache is copied to an output file (e.g mysqlbinlog output).
No formatting behavior is also reflected by the change in the meaning
of the last argument which specifies whether to cache the encoded data.
Rows_log_event::print_helper()
is made to invoke a specialized fragmented cache-to-file copying function
which is
copy_cache_to_file_wrapped()
that takes care of fragmenting also optionally wraps encoded
strings (fragments) into SQL stanzas.
my_b_copy_to_file()
is refactored to into my_b_copy_all_to_file(). The former function
is generalized
to accepts more a limit argument to constraint the copying and does
not reinitialize anymore the cache into reading mode.
The limit does not do any effect on the fully read cache.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/item_func.cc | 2 | ||||
-rw-r--r-- | sql/item_func.h | 4 | ||||
-rw-r--r-- | sql/log_event.cc | 169 | ||||
-rw-r--r-- | sql/log_event.h | 13 | ||||
-rw-r--r-- | sql/log_event_old.cc | 16 | ||||
-rw-r--r-- | sql/sql_binlog.cc | 94 | ||||
-rw-r--r-- | sql/sql_lex.h | 4 | ||||
-rw-r--r-- | sql/sql_yacc.yy | 11 |
8 files changed, 277 insertions, 36 deletions
diff --git a/sql/item_func.cc b/sql/item_func.cc index 8b1c7b3bc61..169eb76d802 100644 --- a/sql/item_func.cc +++ b/sql/item_func.cc @@ -4828,7 +4828,7 @@ bool Item_func_set_user_var::register_field_in_bitmap(uchar *arg) true failure */ -static bool +bool update_hash(user_var_entry *entry, bool set_null, void *ptr, uint length, Item_result type, CHARSET_INFO *cs, bool unsigned_arg) diff --git a/sql/item_func.h b/sql/item_func.h index 2c0e3a62f6a..e3eab02f213 100644 --- a/sql/item_func.h +++ b/sql/item_func.h @@ -2283,4 +2283,8 @@ bool eval_const_cond(COND *cond); extern bool volatile mqh_used; +bool update_hash(user_var_entry *entry, bool set_null, void *ptr, uint length, + Item_result type, CHARSET_INFO *cs, + bool unsigned_arg); + #endif /* ITEM_FUNC_INCLUDED */ diff --git a/sql/log_event.cc b/sql/log_event.cc index 883f1863ac4..1369ba2d687 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -2783,9 +2783,23 @@ void free_table_map_log_event(Table_map_log_event *event) delete event; } +/** + Encode the event, optionally per 'do_print_encoded' arg store the + result into the argument cache; optionally per event_info's + 'verbose' print into the cache a verbose representation of the event. + Note, no extra wrapping is done to the being io-cached data, like + to producing a BINLOG query. It's left for a routine that extracts from + the cache. + + @param file pointer to IO_CACHE + @param print_event_info pointer to print_event_info specializing + what out of and how to print the event + @param do_print_encoded whether to store base64-encoded event + into @file. +*/ void Log_event::print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, - bool more) + bool do_print_encoded) { const uchar *ptr= (const uchar *)temp_buf; uint32 size= uint4korr(ptr + EVENT_LEN_OFFSET); @@ -2804,17 +2818,9 @@ void Log_event::print_base64(IO_CACHE* file, DBUG_ASSERT(0); } - if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS) - { - if (my_b_tell(file) == 0) - my_b_write_string(file, "\nBINLOG '\n"); - + if (do_print_encoded) my_b_printf(file, "%s\n", tmp_str); - if (!more) - my_b_printf(file, "'%s\n", print_event_info->delimiter); - } - if (print_event_info->verbose) { Rows_log_event *ev= NULL; @@ -4851,9 +4857,17 @@ void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER && !print_event_info->short_form) { - if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS) + /* BINLOG is matched with the delimiter below on the same level */ + bool do_print_encoded= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS; + if (do_print_encoded) my_b_printf(&cache, "BINLOG '\n"); - print_base64(&cache, print_event_info, FALSE); + + print_base64(&cache, print_event_info, do_print_encoded); + + if (do_print_encoded) + my_b_printf(&cache, "'%s\n", print_event_info->delimiter); + print_event_info->printed_fd_event= TRUE; } DBUG_VOID_RETURN; @@ -10491,12 +10505,128 @@ void Rows_log_event::pack_info(Protocol *protocol) #endif #ifdef MYSQL_CLIENT +/** + Print an event "body" cache to @c file possibly in two fragments. + Each fragement is optionally per @c do_wrap to produce an SQL statement. + + @param file a file to print to + @param body the "body" IO_CACHE of event + @param do_wrap whether to wrap base64-encoded strings with + SQL cover. + @param delimiter delimiter string + + The function signals on any error through setting @c body->error to -1. +*/ +void copy_cache_to_file_wrapped(FILE *file, + IO_CACHE *body, + bool do_wrap, + const char *delimiter) +{ + const char str_binlog[]= "\nBINLOG '\n"; + const char fmt_delim[]= "'%s\n"; + const char fmt_n_delim[]= "\n'%s"; + const my_off_t cache_size= my_b_tell(body); + + if (reinit_io_cache(body, READ_CACHE, 0L, FALSE, FALSE)) + { + body->error= -1; + goto end; + } + + if (!do_wrap) + { + my_b_copy_to_file(body, file, SIZE_T_MAX); + } + else if (4 + sizeof(str_binlog) + cache_size + sizeof(fmt_delim) > + opt_binlog_rows_event_max_encoded_size) + { + /* + 2 fragments can always represent near 1GB row-based + base64-encoded event as two strings each of size less than + max(max_allowed_packet). Greater number of fragments does not + save from potential need to tweak (increase) @@max_allowed_packet + before to process the fragments. So 2 is safe and enough. + + Split the big query when its packet size's estimation exceeds a + limit. The estimate includes the maximum packet header + contribution of non-compressed packet. + */ + const char fmt_frag[]= "\nSET @binlog_fragment_%d ='\n"; + + my_fprintf(file, fmt_frag, 0); + if (my_b_copy_to_file(body, file, cache_size/2 + 1)) + { + body->error= -1; + goto end; + } + my_fprintf(file, fmt_n_delim, delimiter); + + my_fprintf(file, fmt_frag, 1); + if (my_b_copy_to_file(body, file, SIZE_T_MAX)) + { + body->error= -1; + goto end; + } + my_fprintf(file, fmt_delim, delimiter); + + my_fprintf(file, "BINLOG @binlog_fragment_0, @binlog_fragment_1%s\n", + delimiter); + } + else + { + my_fprintf(file, str_binlog); + if (my_b_copy_to_file(body, file, SIZE_T_MAX)) + { + body->error= -1; + goto end; + } + my_fprintf(file, fmt_delim, delimiter); + } + reinit_io_cache(body, WRITE_CACHE, 0, FALSE, TRUE); + +end: + return; +} + +/** + The function invokes base64 encoder to run on the current + event string and store the result into two caches. + When the event ends the current statement the caches are is copied into + the argument file. + Copying is also concerned how to wrap the event, specifically to produce + a valid SQL syntax. + When the encoded data size is within max(MAX_ALLOWED_PACKET) + a regular BINLOG query is composed. Otherwise it is build as fragmented + + SET @binlog_fragment_0='...'; + SET @binlog_fragment_1='...'; + BINLOG @binlog_fragment_0, @binlog_fragment_1; + + where fragments are represented by a pair of indexed user + "one shot" variables. + + @note + If any changes made don't forget to duplicate them to + Old_rows_log_event as long as it's supported. + + @param file pointer to IO_CACHE + @param print_event_info pointer to print_event_info specializing + what out of and how to print the event + @param name the name of a table that the event operates on + + The function signals on any error of cache access through setting + that cache's @c error to -1. +*/ void Rows_log_event::print_helper(FILE *file, PRINT_EVENT_INFO *print_event_info, char const *const name) { IO_CACHE *const head= &print_event_info->head_cache; IO_CACHE *const body= &print_event_info->body_cache; + bool do_print_encoded= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS && + !print_event_info->short_form; + if (!print_event_info->short_form) { bool const last_stmt_event= get_flags(STMT_END_F); @@ -10504,13 +10634,18 @@ void Rows_log_event::print_helper(FILE *file, my_b_printf(head, "\t%s: table id %lu%s\n", name, m_table_id, last_stmt_event ? " flags: STMT_END_F" : ""); - print_base64(body, print_event_info, !last_stmt_event); + print_base64(body, print_event_info, do_print_encoded); } if (get_flags(STMT_END_F)) { - copy_event_cache_to_file_and_reinit(head, file); - copy_event_cache_to_file_and_reinit(body, file); + if (copy_event_cache_to_file_and_reinit(head, file)) + { + head->error= -1; + return; + } + copy_cache_to_file_wrapped(file, body, do_print_encoded, + print_event_info->delimiter); } } #endif @@ -11379,7 +11514,9 @@ void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) m_dbnam, m_tblnam, m_table_id, ((m_flags & TM_BIT_HAS_TRIGGERS_F) ? " (has triggers)" : "")); - print_base64(&print_event_info->body_cache, print_event_info, TRUE); + print_base64(&print_event_info->body_cache, print_event_info, + print_event_info->base64_output_mode != + BASE64_OUTPUT_DECODE_ROWS); copy_event_cache_to_file_and_reinit(&print_event_info->head_cache, file); } } diff --git a/sql/log_event.h b/sql/log_event.h index 90900f63533..446bd8cb827 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -1157,7 +1157,7 @@ public: void print_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, bool is_more); void print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, - bool is_more); + bool do_print_encoded); #endif /* read_log_event() functions read an event from a binlog or relay @@ -4891,15 +4891,22 @@ public: virtual int get_data_size() { return IGNORABLE_HEADER_LEN; } }; +#ifdef MYSQL_CLIENT +void copy_cache_to_file_wrapped(FILE *file, + PRINT_EVENT_INFO *print_event_info, + IO_CACHE *body, + bool do_wrap); +#endif static inline bool copy_event_cache_to_file_and_reinit(IO_CACHE *cache, FILE *file) { - return - my_b_copy_to_file(cache, file) || + return + my_b_copy_all_to_file(cache, file) || reinit_io_cache(cache, WRITE_CACHE, 0, FALSE, TRUE); } + #ifdef MYSQL_SERVER /***************************************************************************** diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index d2b4470bbf9..a6f2ed3f416 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -1850,12 +1850,17 @@ void Old_rows_log_event::pack_info(Protocol *protocol) #ifdef MYSQL_CLIENT +/* Method duplicates Rows_log_event's one */ void Old_rows_log_event::print_helper(FILE *file, PRINT_EVENT_INFO *print_event_info, char const *const name) { IO_CACHE *const head= &print_event_info->head_cache; IO_CACHE *const body= &print_event_info->body_cache; + bool do_print_encoded= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS && + !print_event_info->short_form; + if (!print_event_info->short_form) { bool const last_stmt_event= get_flags(STMT_END_F); @@ -1863,13 +1868,18 @@ void Old_rows_log_event::print_helper(FILE *file, my_b_printf(head, "\t%s: table id %lu%s\n", name, m_table_id, last_stmt_event ? " flags: STMT_END_F" : ""); - print_base64(body, print_event_info, !last_stmt_event); + print_base64(body, print_event_info, do_print_encoded); } if (get_flags(STMT_END_F)) { - copy_event_cache_to_file_and_reinit(head, file); - copy_event_cache_to_file_and_reinit(body, file); + if (copy_event_cache_to_file_and_reinit(head, file)) + { + head->error= -1; + return; + } + copy_cache_to_file_wrapped(file, body, do_print_encoded, + print_event_info->delimiter); } } #endif diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 91cf038907e..2e861d00f10 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -28,6 +28,65 @@ // START_EVENT_V3, // Log_event_type, // Log_event + +/** + Copy fragments into the standard placeholder thd->lex->comment.str. + + Compute the size of the (still) encoded total, + allocate and then copy fragments one after another. + The size can exceed max(max_allowed_packet) which is not a + problem as no String instance is created off this char array. + + @param thd THD handle + @return + 0 at success, + -1 otherwise. +*/ +int binlog_defragment(THD *thd) +{ + user_var_entry *entry[2]; + LEX_STRING name[2]= { thd->lex->comment, thd->lex->ident }; + + /* compute the total size */ + thd->lex->comment.str= NULL; + thd->lex->comment.length= 0; + for (uint k= 0; k < 2; k++) + { + entry[k]= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name[k].str, + name[k].length); + if (!entry[k] || entry[k]->type != STRING_RESULT) + { + my_error(ER_WRONG_TYPE_FOR_VAR, MYF(0), name[k].str); + return -1; + } + thd->lex->comment.length += entry[k]->length; + } + + thd->lex->comment.str= // to be freed by the caller + (char *) my_malloc(thd->lex->comment.length, MYF(MY_WME)); + if (!thd->lex->comment.str) + { + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); + return -1; + } + + /* fragments are merged into allocated buf while the user var:s get reset */ + size_t gathered_length= 0; + for (uint k=0; k < 2; k++) + { + memcpy(thd->lex->comment.str + gathered_length, entry[k]->value, + entry[k]->length); + gathered_length += entry[k]->length; + update_hash(entry[k], true, NULL, 0, STRING_RESULT, &my_charset_bin, 0); + } + + DBUG_ASSERT(gathered_length == thd->lex->comment.length); + + return 0; +} + + /** Execute a BINLOG statement. @@ -53,14 +112,6 @@ void mysql_client_binlog_statement(THD* thd) if (check_global_access(thd, SUPER_ACL)) DBUG_VOID_RETURN; - size_t coded_len= thd->lex->comment.length; - if (!coded_len) - { - my_error(ER_SYNTAX_ERROR, MYF(0)); - DBUG_VOID_RETURN; - } - size_t decoded_len= base64_needed_decoded_length(coded_len); - /* option_bits will be changed when applying the event. But we don't expect it be changed permanently after BINLOG statement, so backup it first. @@ -81,6 +132,8 @@ void mysql_client_binlog_statement(THD* thd) int err; Relay_log_info *rli; rpl_group_info *rgi; + char *buf= NULL; + size_t coded_len= 0, decoded_len= 0; rli= thd->rli_fake; if (!rli) @@ -102,15 +155,13 @@ void mysql_client_binlog_statement(THD* thd) rgi->thd= thd; const char *error= 0; - char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME)); Log_event *ev = 0; + my_bool is_fragmented= FALSE; /* Out of memory check */ - if (!(rli && - rli->relay_log.description_event_for_exec && - buf)) + if (!(rli && rli->relay_log.description_event_for_exec)) { my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); /* needed 1 bytes */ goto end; @@ -119,6 +170,23 @@ void mysql_client_binlog_statement(THD* thd) rli->sql_driver_thd= thd; rli->no_storage= TRUE; + if (unlikely(is_fragmented= thd->lex->comment.str && thd->lex->ident.str)) + if (binlog_defragment(thd)) + goto end; + + if (!(coded_len= thd->lex->comment.length)) + { + my_error(ER_SYNTAX_ERROR, MYF(0)); + goto end; + } + + decoded_len= base64_needed_decoded_length(coded_len); + if (!(buf= (char *) my_malloc(decoded_len, MYF(MY_WME)))) + { + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); + goto end; + } + for (char const *strptr= thd->lex->comment.str ; strptr < thd->lex->comment.str + thd->lex->comment.length ; ) { @@ -272,6 +340,8 @@ void mysql_client_binlog_statement(THD* thd) my_ok(thd); end: + if (unlikely(is_fragmented)) + my_free(thd->lex->comment.str); thd->variables.option_bits= thd_options; rgi->slave_close_thread_tables(thd); my_free(buf); diff --git a/sql/sql_lex.h b/sql/sql_lex.h index f1edc809579..edc647522d3 100644 --- a/sql/sql_lex.h +++ b/sql/sql_lex.h @@ -2459,6 +2459,10 @@ struct LEX: public Query_tables_list String *wild; /* Wildcard in SHOW {something} LIKE 'wild'*/ sql_exchange *exchange; select_result *result; + /** + @c the two may also hold BINLOG arguments: either comment holds a + base64-char string or both represent the BINLOG fragment user variables. + */ LEX_STRING comment, ident; LEX_USER *grant_user; XID *xid; diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index 0b85b597baf..bfaa0a60a24 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -7950,8 +7950,17 @@ binlog_base64_event: { Lex->sql_command = SQLCOM_BINLOG_BASE64_EVENT; Lex->comment= $2; + Lex->ident.str= NULL; + Lex->ident.length= 0; } - ; + | + BINLOG_SYM '@' ident_or_text ',' '@' ident_or_text + { + Lex->sql_command = SQLCOM_BINLOG_BASE64_EVENT; + Lex->comment= $3; + Lex->ident= $6; + } + ; check_view_or_table: table_or_tables table_list opt_mi_check_type |