diff options
author | Oleksandr Byelkin <sanja@mariadb.com> | 2019-02-21 14:40:52 +0100 |
---|---|---|
committer | Oleksandr Byelkin <sanja@mariadb.com> | 2019-02-21 14:40:52 +0100 |
commit | 93ac7ae70ff000353538f732899b421a3f2ea7ce (patch) | |
tree | 819b5ca057d80b42699de219c982b7924857c406 /sql/log_event.cc | |
parent | 4932aba921755cfbc351b92c67068a5c48d3922b (diff) | |
parent | a40de1bdeb218d66d5cc737758a4bab1b06f255d (diff) | |
download | mariadb-git-93ac7ae70ff000353538f732899b421a3f2ea7ce.tar.gz |
Merge branch '10.3' into 10.4
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 386 |
1 files changed, 326 insertions, 60 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index bbadda3167e..70f0e6c2623 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -3685,9 +3685,23 @@ void free_table_map_log_event(Table_map_log_event *event) delete event; } +/** + Encode the event, optionally per 'do_print_encoded' arg store the + result into the argument cache; optionally per event_info's + 'verbose' print into the cache a verbose representation of the event. + Note, no extra wrapping is done to the being io-cached data, like + to producing a BINLOG query. It's left for a routine that extracts from + the cache. + + @param file pointer to IO_CACHE + @param print_event_info pointer to print_event_info specializing + what out of and how to print the event + @param do_print_encoded whether to store base64-encoded event + into @file. +*/ bool Log_event::print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, - bool more) + bool do_print_encoded) { uchar *ptr= (uchar *)temp_buf; uint32 size= uint4korr(ptr + EVENT_LEN_OFFSET); @@ -3738,12 +3752,9 @@ bool Log_event::print_base64(IO_CACHE* file, delete ev; } - if (print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER && - print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS && - ! print_event_info->short_form) + if (do_print_encoded) { size_t const tmp_str_sz= my_base64_needed_encoded_length((int) size); - bool error= 0; char *tmp_str; if (!(tmp_str= (char *) my_malloc(tmp_str_sz, MYF(MY_WME)))) goto err; @@ -3753,17 +3764,8 @@ bool Log_event::print_base64(IO_CACHE* file, DBUG_ASSERT(0); } - if (my_b_tell(file) == 0) - if (unlikely(my_b_write_string(file, "\nBINLOG '\n"))) - error= 1; - if (likely(!error) && unlikely(my_b_printf(file, "%s\n", tmp_str))) - error= 1; - if (!more && likely(!error)) - if (unlikely(my_b_printf(file, "'%s\n", print_event_info->delimiter))) - error= 1; + my_b_printf(file, "%s\n", tmp_str); my_free(tmp_str); - if (unlikely(error)) - goto err; } #ifdef WHEN_FLASHBACK_REVIEW_READY @@ -3877,9 +3879,22 @@ bool Log_event::print_base64(IO_CACHE* file, } #else if (print_event_info->verbose) + { + /* + Verbose event printout can't start before encoded data + got enquoted. This is done at this point though multi-row + statement remain vulnerable. + TODO: fix MDEV-10362 to remove this workaround. + */ + if (print_event_info->base64_output_mode != + BASE64_OUTPUT_DECODE_ROWS) + my_b_printf(file, "'%s\n", print_event_info->delimiter); error= ev->print_verbose(file, print_event_info); + } else + { ev->count_row_events(print_event_info); + } #endif delete ev; if (unlikely(error)) @@ -5585,6 +5600,22 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, else thd->variables.collation_database= thd->db_charset; + { + const CHARSET_INFO *cs= thd->charset(); + /* + We cannot ask for parsing a statement using a character set + without state_maps (parser internal data). + */ + if (!cs->state_map) + { + rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + ER_THD(thd, ER_SLAVE_FATAL_ERROR), + "character_set cannot be parsed"); + thd->is_slave_error= true; + goto end; + } + } + /* Record any GTID in the same transaction, so slave state is transactionally consistent. @@ -6023,11 +6054,18 @@ bool Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER && !print_event_info->short_form) { - if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS) - if (my_b_printf(&cache, "BINLOG '\n")) - goto err; - if (print_base64(&cache, print_event_info, FALSE)) + /* BINLOG is matched with the delimiter below on the same level */ + bool do_print_encoded= + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS; + if (do_print_encoded) + my_b_printf(&cache, "BINLOG '\n"); + + if (print_base64(&cache, print_event_info, do_print_encoded)) goto err; + + if (do_print_encoded) + my_b_printf(&cache, "'%s\n", print_event_info->delimiter); + print_event_info->printed_fd_event= TRUE; } DBUG_RETURN(cache.flush_data()); @@ -9189,12 +9227,6 @@ User_var_log_event(const char* buf, uint event_len, val_len= uint4korr(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + UV_CHARSET_NUMBER_SIZE); - if (val + val_len > buf_end) - { - error= true; - goto err; - } - /** We need to check if this is from an old server that did not pack information for flags. @@ -10969,7 +11001,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, DBUG_VOID_RETURN; } size_t const data_size= event_len - read_size; - DBUG_PRINT("info",("m_table_id: %lu m_flags: %d m_width: %lu data_size: %lu", + DBUG_PRINT("info",("m_table_id: %llu m_flags: %d m_width: %lu data_size: %lu", m_table_id, m_flags, m_width, (ulong) data_size)); m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME)); @@ -11182,12 +11214,12 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)"); int error= 0; /* - If m_table_id == ~0UL, then we have a dummy event that does not + If m_table_id == ~0ULL, then we have a dummy event that does not contain any data. In that case, we just remove all tables in the tables_to_lock list, close the thread tables, and return with success. */ - if (m_table_id == ~0UL) + if (m_table_id == ~0ULL) { /* This one is supposed to be set: just an extra check so that @@ -11453,7 +11485,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) table= m_table= rgi->m_table_map.get_table(m_table_id); - DBUG_PRINT("debug", ("m_table:%p, m_table_id: %lu%s", + DBUG_PRINT("debug", ("m_table:%p, m_table_id: %llu%s", m_table, m_table_id, table && master_had_triggers ? " (master had triggers)" : "")); @@ -11819,14 +11851,14 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi) bool Rows_log_event::write_data_header() { uchar buf[ROWS_HEADER_LEN_V2]; // No need to init the buffer - DBUG_ASSERT(m_table_id != ~0UL); + DBUG_ASSERT(m_table_id != ~0ULL); DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", { int4store(buf + 0, m_table_id); int2store(buf + 4, m_flags); return (write_data(buf, 6)); }); - int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id); + int6store(buf + RW_MAPID_OFFSET, m_table_id); int2store(buf + RW_FLAGS_OFFSET, m_flags); return write_data(buf, ROWS_HEADER_LEN); } @@ -11894,12 +11926,226 @@ void Rows_log_event::pack_info(Protocol *protocol) char const *const flagstr= get_flags(STMT_END_F) ? " flags: STMT_END_F" : ""; size_t bytes= my_snprintf(buf, sizeof(buf), - "table_id: %lu%s", m_table_id, flagstr); + "table_id: %llu%s", m_table_id, flagstr); protocol->store(buf, bytes, &my_charset_bin); } #endif #ifdef MYSQL_CLIENT + +const char str_binlog[]= "\nBINLOG '\n"; +const char fmt_delim[]= "'%s\n"; +const char fmt_n_delim[]= "\n'%s"; +const char fmt_frag[]= "\nSET @binlog_fragment_%d ='\n"; +const char fmt_binlog2[]= "BINLOG @binlog_fragment_0, @binlog_fragment_1%s\n"; + +/** + Print an event "body" cache to @c file possibly in two fragments. + Each fragement is optionally per @c do_wrap to produce an SQL statement. + + @param file a file to print to + @param body the "body" IO_CACHE of event + @param do_wrap whether to wrap base64-encoded strings with + SQL cover. + @param delimiter delimiter string + + @param is_verbose MDEV-10362 workraround parameter to pass + info on presence of verbose printout in cache encoded data + + The function signals on any error through setting @c body->error to -1. +*/ +bool copy_cache_to_file_wrapped(IO_CACHE *body, + FILE *file, + bool do_wrap, + const char *delimiter, + bool is_verbose) +{ + const my_off_t cache_size= my_b_tell(body); + + if (reinit_io_cache(body, READ_CACHE, 0L, FALSE, FALSE)) + goto err; + + if (!do_wrap) + { + my_b_copy_to_file(body, file, SIZE_T_MAX); + } + else if (4 + sizeof(str_binlog) + cache_size + sizeof(fmt_delim) > + opt_binlog_rows_event_max_encoded_size) + { + /* + 2 fragments can always represent near 1GB row-based + base64-encoded event as two strings each of size less than + max(max_allowed_packet). Greater number of fragments does not + save from potential need to tweak (increase) @@max_allowed_packet + before to process the fragments. So 2 is safe and enough. + + Split the big query when its packet size's estimation exceeds a + limit. The estimate includes the maximum packet header + contribution of non-compressed packet. + */ + my_fprintf(file, fmt_frag, 0); + if (my_b_copy_to_file(body, file, (size_t) cache_size/2 + 1)) + goto err; + my_fprintf(file, fmt_n_delim, delimiter); + + my_fprintf(file, fmt_frag, 1); + if (my_b_copy_to_file(body, file, SIZE_T_MAX)) + goto err; + if (!is_verbose) + my_fprintf(file, fmt_delim, delimiter); + + my_fprintf(file, fmt_binlog2, delimiter); + } + else + { + my_fprintf(file, str_binlog); + if (my_b_copy_to_file(body, file, SIZE_T_MAX)) + goto err; + if (!is_verbose) + my_fprintf(file, fmt_delim, delimiter); + } + reinit_io_cache(body, WRITE_CACHE, 0, FALSE, TRUE); + + return false; + +err: + body->error = -1; + return true; +} + + +/** + Print an event "body" cache to @c file possibly in two fragments. + Each fragement is optionally per @c do_wrap to produce an SQL statement. + + @param file a file to print to + @param body the "body" IO_CACHE of event + @param do_wrap whether to wrap base64-encoded strings with + SQL cover. + @param delimiter delimiter string + + The function signals on any error through setting @c body->error to -1. +*/ +bool copy_cache_to_string_wrapped(IO_CACHE *cache, + LEX_STRING *to, + bool do_wrap, + const char *delimiter, + bool is_verbose) +{ + const my_off_t cache_size= my_b_tell(cache); + // contribution to total size estimate of formating + const size_t fmt_size= + sizeof(str_binlog) + 2*(sizeof(fmt_frag) + 2 /* %d */) + + sizeof(fmt_delim) + sizeof(fmt_n_delim) + + sizeof(fmt_binlog2) + + 3*PRINT_EVENT_INFO::max_delimiter_size; + + if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) + goto err; + + if (!(to->str= (char*) my_malloc((size_t)cache->end_of_file + fmt_size, + MYF(0)))) + { + perror("Out of memory: can't allocate memory in " + "copy_cache_to_string_wrapped()."); + goto err; + } + + if (!do_wrap) + { + if (my_b_read(cache, (uchar*) to->str, + (to->length= (size_t)cache->end_of_file))) + goto err; + } + else if (4 + sizeof(str_binlog) + cache_size + sizeof(fmt_delim) > + opt_binlog_rows_event_max_encoded_size) + { + /* + 2 fragments can always represent near 1GB row-based + base64-encoded event as two strings each of size less than + max(max_allowed_packet). Greater number of fragments does not + save from potential need to tweak (increase) @@max_allowed_packet + before to process the fragments. So 2 is safe and enough. + + Split the big query when its packet size's estimation exceeds a + limit. The estimate includes the maximum packet header + contribution of non-compressed packet. + */ + char *str= to->str; + size_t add_to_len; + + str += (to->length= sprintf(str, fmt_frag, 0)); + if (my_b_read(cache, (uchar*) str, (uint32) (cache_size/2 + 1))) + goto err; + str += (add_to_len = (uint32) (cache_size/2 + 1)); + to->length += add_to_len; + str += (add_to_len= sprintf(str, fmt_n_delim, delimiter)); + to->length += add_to_len; + + str += (add_to_len= sprintf(str, fmt_frag, 1)); + to->length += add_to_len; + if (my_b_read(cache, (uchar*) str, uint32(cache->end_of_file - (cache_size/2 + 1)))) + goto err; + str += (add_to_len= uint32(cache->end_of_file - (cache_size/2 + 1))); + to->length += add_to_len; + if (!is_verbose) + { + str += (add_to_len= sprintf(str , fmt_delim, delimiter)); + to->length += add_to_len; + } + to->length += sprintf(str, fmt_binlog2, delimiter); + } + else + { + char *str= to->str; + + str += (to->length= sprintf(str, str_binlog)); + if (my_b_read(cache, (uchar*) str, (size_t)cache->end_of_file)) + goto err; + str += cache->end_of_file; + to->length += (size_t)cache->end_of_file; + if (!is_verbose) + to->length += sprintf(str , fmt_delim, delimiter); + } + + reinit_io_cache(cache, WRITE_CACHE, 0, FALSE, TRUE); + + return false; + +err: + cache->error= -1; + return true; +} + +/** + The function invokes base64 encoder to run on the current + event string and store the result into two caches. + When the event ends the current statement the caches are is copied into + the argument file. + Copying is also concerned how to wrap the event, specifically to produce + a valid SQL syntax. + When the encoded data size is within max(MAX_ALLOWED_PACKET) + a regular BINLOG query is composed. Otherwise it is build as fragmented + + SET @binlog_fragment_0='...'; + SET @binlog_fragment_1='...'; + BINLOG @binlog_fragment_0, @binlog_fragment_1; + + where fragments are represented by a pair of indexed user + "one shot" variables. + + @note + If any changes made don't forget to duplicate them to + Old_rows_log_event as long as it's supported. + + @param file pointer to IO_CACHE + @param print_event_info pointer to print_event_info specializing + what out of and how to print the event + @param name the name of a table that the event operates on + + The function signals on any error of cache access through setting + that cache's @c error to -1. +*/ bool Rows_log_event::print_helper(FILE *file, PRINT_EVENT_INFO *print_event_info, char const *const name) @@ -11909,18 +12155,24 @@ bool Rows_log_event::print_helper(FILE *file, #ifdef WHEN_FLASHBACK_REVIEW_READY IO_CACHE *const sql= &print_event_info->review_sql_cache; #endif + bool do_print_encoded= + print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER && + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS && + !print_event_info->short_form; bool const last_stmt_event= get_flags(STMT_END_F); if (!print_event_info->short_form) { + char llbuff[22]; + print_header(head, print_event_info, !last_stmt_event); - if (my_b_printf(head, "\t%s: table id %lu%s\n", - name, m_table_id, + if (my_b_printf(head, "\t%s: table id %s%s\n", + name, ullstr(m_table_id, llbuff), last_stmt_event ? " flags: STMT_END_F" : "")) goto err; } if (!print_event_info->short_form || print_event_info->print_row_count) - if (print_base64(body, print_event_info, !last_stmt_event)) + if (print_base64(body, print_event_info, do_print_encoded)) goto err; if (last_stmt_event) @@ -11928,25 +12180,31 @@ bool Rows_log_event::print_helper(FILE *file, if (!is_flashback) { if (copy_event_cache_to_file_and_reinit(head, file) || - copy_event_cache_to_file_and_reinit(body, file)) + copy_cache_to_file_wrapped(body, file, do_print_encoded, + print_event_info->delimiter, + print_event_info->verbose)) goto err; } else { - LEX_STRING tmp_str; - if (copy_event_cache_to_string_and_reinit(head, &tmp_str)) - return 1; - output_buf.append(tmp_str.str, tmp_str.length); // Not \0 terminated - my_free(tmp_str.str); - if (copy_event_cache_to_string_and_reinit(body, &tmp_str)) - return 1; - output_buf.append(tmp_str.str, tmp_str.length); - my_free(tmp_str.str); + LEX_STRING tmp_str; + + if (copy_event_cache_to_string_and_reinit(head, &tmp_str)) + return 1; + output_buf.append(tmp_str.str, tmp_str.length); // Not \0 terminated); + my_free(tmp_str.str); + + if (copy_cache_to_string_wrapped(body, &tmp_str, do_print_encoded, + print_event_info->delimiter, + print_event_info->verbose)) + return 1; + output_buf.append(tmp_str.str, tmp_str.length); + my_free(tmp_str.str); #ifdef WHEN_FLASHBACK_REVIEW_READY - if (copy_event_cache_to_string_and_reinit(sql, &tmp_str)) - return 1; - output_buf.append(tmp_str.str, tmp_str.length); - my_free(tmp_str.str); + if (copy_event_cache_to_string_and_reinit(sql, &tmp_str)) + return 1; + output_buf.append(tmp_str.str, tmp_str.length); + my_free(tmp_str.str); #endif } } @@ -12217,7 +12475,7 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, uchar cbuf[MAX_INT_WIDTH]; uchar *cbuf_end; DBUG_ENTER("Table_map_log_event::Table_map_log_event(TABLE)"); - DBUG_ASSERT(m_table_id != ~0UL); + DBUG_ASSERT(m_table_id != ~0ULL); /* In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in table.cc / alloc_table_share(): @@ -12302,7 +12560,7 @@ Table_map_log_event::Table_map_log_event(const char *buf, uint event_len, #endif m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0), m_colcnt(0), m_coltype(0), - m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0), + m_memory(NULL), m_table_id(ULONGLONG_MAX), m_flags(0), m_data_size(0), m_field_metadata(0), m_field_metadata_size(0), m_null_bits(0), m_meta_memory(NULL) { @@ -12339,7 +12597,7 @@ Table_map_log_event::Table_map_log_event(const char *buf, uint event_len, post_start+= TM_FLAGS_OFFSET; } - DBUG_ASSERT(m_table_id != ~0UL); + DBUG_ASSERT(m_table_id != ~0ULL); m_flags= uint2korr(post_start); @@ -12659,7 +12917,7 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi) table_list->updating= 1; table_list->required_type= TABLE_TYPE_NORMAL; - DBUG_PRINT("debug", ("table: %s is mapped to %lu", + DBUG_PRINT("debug", ("table: %s is mapped to %llu", table_list->table_name.str, table_list->table_id)); table_list->master_had_triggers= ((m_flags & TM_BIT_HAS_TRIGGERS_F) ? 1 : 0); @@ -12761,7 +13019,7 @@ int Table_map_log_event::do_update_pos(rpl_group_info *rgi) #ifndef MYSQL_CLIENT bool Table_map_log_event::write_data_header() { - DBUG_ASSERT(m_table_id != ~0UL); + DBUG_ASSERT(m_table_id != ~0ULL); uchar buf[TABLE_MAP_HEADER_LEN]; DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", { @@ -12769,7 +13027,7 @@ bool Table_map_log_event::write_data_header() int2store(buf + 4, m_flags); return (write_data(buf, 6)); }); - int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id); + int6store(buf + TM_MAPID_OFFSET, m_table_id); int2store(buf + TM_FLAGS_OFFSET, m_flags); return write_data(buf, TABLE_MAP_HEADER_LEN); } @@ -12819,7 +13077,7 @@ void Table_map_log_event::pack_info(Protocol *protocol) { char buf[256]; size_t bytes= my_snprintf(buf, sizeof(buf), - "table_id: %lu (%s.%s)", + "table_id: %llu (%s.%s)", m_table_id, m_dbnam, m_tblnam); protocol->store(buf, bytes, &my_charset_bin); } @@ -12834,17 +13092,25 @@ bool Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) { if (!print_event_info->short_form) { + char llbuff[22]; + print_header(&print_event_info->head_cache, print_event_info, TRUE); if (my_b_printf(&print_event_info->head_cache, - "\tTable_map: %`s.%`s mapped to number %lu%s\n", - m_dbnam, m_tblnam, m_table_id, + "\tTable_map: %`s.%`s mapped to number %s%s\n", + m_dbnam, m_tblnam, ullstr(m_table_id, llbuff), ((m_flags & TM_BIT_HAS_TRIGGERS_F) ? " (has triggers)" : ""))) goto err; } if (!print_event_info->short_form || print_event_info->print_row_count) { - if (print_base64(&print_event_info->body_cache, print_event_info, TRUE) || + bool do_print_encoded= + print_event_info->base64_output_mode != BASE64_OUTPUT_NEVER && + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS && + !print_event_info->short_form; + + if (print_base64(&print_event_info->body_cache, print_event_info, + do_print_encoded) || copy_event_cache_to_file_and_reinit(&print_event_info->head_cache, file)) goto err; @@ -14768,7 +15034,7 @@ err: bool copy_event_cache_to_file_and_reinit(IO_CACHE *cache, FILE *file) { - return (my_b_copy_to_file(cache, file) || + return (my_b_copy_all_to_file(cache, file) || reinit_io_cache(cache, WRITE_CACHE, 0, FALSE, TRUE)); } |