diff options
author | unknown <mats@romeo.(none)> | 2006-10-06 10:17:02 +0200 |
---|---|---|
committer | unknown <mats@romeo.(none)> | 2006-10-06 10:17:02 +0200 |
commit | d8be3113351929f12e4277f4306a1428a280d970 (patch) | |
tree | c53b3dcab5ecc776912602d4cd486b053c124aec | |
parent | d9b292808461af13c9c17a6260475524bc60b728 (diff) | |
download | mariadb-git-d8be3113351929f12e4277f4306a1428a280d970.tar.gz |
BUG#19459 (BINLOG RBR command does not lock tables correctly causing
crash for, e.g., NDB):
Before, mysqlbinlog printed table map events as a separate statement, so
when executing the event, the opened table was subsequently closed
when the statement ended. Instead, the row-based events that make up
a statement are now printed as *one* BINLOG statement, which means
that the table maps and the following *_rows_log_event events are
executed fully before the statement ends.
Changing implementation of BINLOG statement to be able to read the
emitted format, which now consists of several chunks of BASE64-encoded
data.
client/mysqlbinlog.cc:
Using IO_CACHE to print events instead of directly to file.
Factoring out code to write event header and base64 representation into
separate function.
mysys/mf_iocache2.c:
Correcting name in documentation.
sql/log_event.cc:
Adding class Write_on_release_cache that holds an IO_CACHE and that
will write contents of IO_CACHE to a designated file on destruction.
Changing signature of event printing functions print_header() and print_base64()
to write to IO_CACHE and changing *all* calls in those functions in accordance.
This means that all printing functions now print to an IO_CACHE instead of to a file,
and that the IO_CACHE is then copied to the file.
The print() function have the same signature as before, but since it is
using print_header() and print_base64(), the data will now be printed
to an IO_CACHE and then copied to the file.
Changing row-based replication events to incrementally build one
BINLOG statement for all events making up a statement.
sql/log_event.h:
Changing signature of event printing functions print_header() and
print_base64() to write to an IO_CACHE instead of a file.
Changing row-based replication events to incrementally build one
BINLOG statement for all events making up a statement.
Adding a head_cache and a body_cache to cache statement comment
and statement body respectively. In addition, the head_cache is used
when printing other events than the RBR events.
sql/sql_binlog.cc:
Changing code to be able to decode several pieces of base64-encoded data
for a BINLOG statement. The BINLOG statement now consists of several pieces
of BASE64-encoded data, so once a block has been decoded and executed, the
next block has to be read from the statement until there is no more
data to read.
-rw-r--r-- | client/mysqlbinlog.cc | 35 | ||||
-rw-r--r-- | mysys/mf_iocache2.c | 2 | ||||
-rw-r--r-- | sql/log_event.cc | 575 | ||||
-rw-r--r-- | sql/log_event.h | 30 | ||||
-rw-r--r-- | sql/sql_binlog.cc | 133 |
5 files changed, 518 insertions, 257 deletions
diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc index 81ad74466eb..f6d48fdef72 100644 --- a/client/mysqlbinlog.cc +++ b/client/mysqlbinlog.cc @@ -475,6 +475,30 @@ static bool check_database(const char *log_dbname) } + +static int +write_event_header_and_base64(Log_event *ev, FILE *result_file, + PRINT_EVENT_INFO *print_event_info) +{ + DBUG_ENTER("write_event_header_and_base64"); + /* Write header and base64 output to cache */ + IO_CACHE result_cache; + if (init_io_cache(&result_cache, -1, 0, WRITE_CACHE, 0L, FALSE, + MYF(MY_WME | MY_NABP))) + { + return 1; + } + + ev->print_header(&result_cache, print_event_info, FALSE); + ev->print_base64(&result_cache, print_event_info, FALSE); + + /* Read data from cache and write to result file */ + my_b_copy_to_file(&result_cache, result_file); + end_io_cache(&result_cache); + DBUG_RETURN(0); +} + + /* Process an event @@ -537,18 +561,18 @@ int process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, print_event_info->base64_output= opt_base64_output; + DBUG_PRINT("debug", ("event_type: %s", ev->get_type_str())); + switch (ev_type) { case QUERY_EVENT: if (check_database(((Query_log_event*)ev)->db)) goto end; if (opt_base64_output) - { - ev->print_header(result_file, print_event_info); - ev->print_base64(result_file, print_event_info); - } + write_event_header_and_base64(ev, result_file, print_event_info); else ev->print(result_file, print_event_info); break; + case CREATE_FILE_EVENT: { Create_file_log_event* ce= (Create_file_log_event*)ev; @@ -569,8 +593,7 @@ int process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, */ if (opt_base64_output) { - ce->print_header(result_file, print_event_info); - ce->print_base64(result_file, print_event_info); + write_event_header_and_base64(ce, result_file, print_event_info); } else ce->print(result_file, print_event_info, TRUE); diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c index d76b895aeb0..57a06e263a6 100644 --- a/mysys/mf_iocache2.c +++ b/mysys/mf_iocache2.c @@ -28,7 +28,7 @@ Copy contents of an IO_CACHE to a file. SYNOPSIS - copy_io_cache_to_file() + my_b_copy_to_file() cache IO_CACHE to copy from file File to copy to diff --git a/sql/log_event.cc b/sql/log_event.cc index bf876366879..9cccce8f679 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -32,31 +32,110 @@ #define log_cs &my_charset_latin1 /* + Cache that will automatically be written to a dedicated file on + destruction. + + DESCRIPTION + + */ +class Write_on_release_cache +{ +public: + enum flag + { + FLUSH_F + }; + + typedef unsigned short flag_set; + + /* + Constructor. + + SYNOPSIS + Write_on_release_cache + cache Pointer to cache to use + file File to write cache to upon destruction + flags Flags for the cache + + DESCRIPTION + + Class used to guarantee copy of cache to file before exiting the + current block. On successful copy of the cache, the cache will + be reinited as a WRITE_CACHE. + + Currently, a pointer to the cache is provided in the + constructor, but it would be possible to create a subclass + holding the IO_CACHE itself. + */ + Write_on_release_cache(IO_CACHE *cache, FILE *file, flag_set flags = 0) + : m_cache(cache), m_file(file), m_flags(flags) + { + reinit_io_cache(m_cache, WRITE_CACHE, 0L, FALSE, TRUE); + } + + ~Write_on_release_cache() + { + if (!my_b_copy_to_file(m_cache, m_file)) + reinit_io_cache(m_cache, WRITE_CACHE, 0L, FALSE, TRUE); + if (m_flags | FLUSH_F) + fflush(m_file); + } + + /* + Return a pointer to the internal IO_CACHE. + + SYNOPSIS + operator&() + + DESCRIPTION + Function to return a pointer to the internal, so that the object + can be treated as a IO_CACHE and used with the my_b_* IO_CACHE + functions + + RETURN VALUE + A pointer to the internal IO_CACHE. + */ + IO_CACHE *operator&() + { + return m_cache; + } + +private: + // Hidden, to prevent usage. + Write_on_release_cache(Write_on_release_cache const&); + + IO_CACHE *m_cache; + FILE *m_file; + flag_set m_flags; +}; + + +/* pretty_print_str() */ #ifdef MYSQL_CLIENT -static void pretty_print_str(FILE* file, char* str, int len) +static void pretty_print_str(IO_CACHE* cache, char* str, int len) { char* end = str + len; - fputc('\'', file); + my_b_printf(cache, "\'"); while (str < end) { char c; switch ((c=*str++)) { - case '\n': fprintf(file, "\\n"); break; - case '\r': fprintf(file, "\\r"); break; - case '\\': fprintf(file, "\\\\"); break; - case '\b': fprintf(file, "\\b"); break; - case '\t': fprintf(file, "\\t"); break; - case '\'': fprintf(file, "\\'"); break; - case 0 : fprintf(file, "\\0"); break; + case '\n': my_b_printf(cache, "\\n"); break; + case '\r': my_b_printf(cache, "\\r"); break; + case '\\': my_b_printf(cache, "\\\\"); break; + case '\b': my_b_printf(cache, "\\b"); break; + case '\t': my_b_printf(cache, "\\t"); break; + case '\'': my_b_printf(cache, "\\'"); break; + case 0 : my_b_printf(cache, "\\0"); break; default: - fputc(c, file); + my_b_printf(cache, "%c", c); break; } } - fputc('\'', file); + my_b_printf(cache, "\'"); } #endif /* MYSQL_CLIENT */ @@ -293,14 +372,15 @@ append_query_string(CHARSET_INFO *csinfo, */ #ifdef MYSQL_CLIENT -static void print_set_option(FILE* file, uint32 bits_changed, uint32 option, - uint32 flags, const char* name, bool* need_comma) +static void print_set_option(IO_CACHE* file, uint32 bits_changed, + uint32 option, uint32 flags, const char* name, + bool* need_comma) { if (bits_changed & option) { if (*need_comma) - fprintf(file,", "); - fprintf(file,"%s=%d", name, test(flags & option)); + my_b_printf(file,", "); + my_b_printf(file,"%s=%d", name, test(flags & option)); *need_comma= 1; } } @@ -959,20 +1039,23 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, Log_event::print_header() */ -void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) +void Log_event::print_header(IO_CACHE* file, + PRINT_EVENT_INFO* print_event_info, + bool is_more __attribute__((unused))) { char llbuff[22]; my_off_t hexdump_from= print_event_info->hexdump_from; + DBUG_ENTER("Log_event::print_header"); - fputc('#', file); + my_b_printf(file, "#"); print_timestamp(file); - fprintf(file, " server id %d end_log_pos %s ", server_id, - llstr(log_pos,llbuff)); + my_b_printf(file, " server id %d end_log_pos %s ", server_id, + llstr(log_pos,llbuff)); /* mysqlbinlog --hexdump */ if (print_event_info->hexdump_from) { - fprintf(file, "\n"); + my_b_printf(file, "\n"); uchar *ptr= (uchar*)temp_buf; my_off_t size= uint4korr(ptr + EVENT_LEN_OFFSET) - LOG_EVENT_MINIMAL_HEADER_LEN; @@ -985,15 +1068,21 @@ void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) /* Pretty-print event common header if header is exactly 19 bytes */ if (print_event_info->common_header_len == LOG_EVENT_MINIMAL_HEADER_LEN) { - fprintf(file, "# Position Timestamp Type Master ID " - "Size Master Pos Flags \n"); - fprintf(file, "# %8.8lx %02x %02x %02x %02x %02x " - "%02x %02x %02x %02x %02x %02x %02x %02x " - "%02x %02x %02x %02x %02x %02x\n", - (unsigned long) hexdump_from, - ptr[0], ptr[1], ptr[2], ptr[3], ptr[4], ptr[5], ptr[6], - ptr[7], ptr[8], ptr[9], ptr[10], ptr[11], ptr[12], ptr[13], - ptr[14], ptr[15], ptr[16], ptr[17], ptr[18]); + char emit_buf[256]; // Enough for storing one line + my_b_printf(file, "# Position Timestamp Type Master ID " + "Size Master Pos Flags \n"); + int const bytes_written= + my_snprintf(emit_buf, sizeof(emit_buf), + "# %8.8lx %02x %02x %02x %02x %02x " + "%02x %02x %02x %02x %02x %02x %02x %02x " + "%02x %02x %02x %02x %02x %02x\n", + (unsigned long) hexdump_from, + ptr[0], ptr[1], ptr[2], ptr[3], ptr[4], ptr[5], ptr[6], + ptr[7], ptr[8], ptr[9], ptr[10], ptr[11], ptr[12], ptr[13], + ptr[14], ptr[15], ptr[16], ptr[17], ptr[18]); + DBUG_ASSERT(bytes_written >= 0); + DBUG_ASSERT(static_cast<my_size_t>(bytes_written) < sizeof(emit_buf)); + my_b_write(file, emit_buf, bytes_written); ptr += LOG_EVENT_MINIMAL_HEADER_LEN; hexdump_from += LOG_EVENT_MINIMAL_HEADER_LEN; } @@ -1010,9 +1099,21 @@ void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) if (i % 16 == 15) { - fprintf(file, "# %8.8lx %-48.48s |%16s|\n", - (unsigned long) (hexdump_from + (i & 0xfffffff0)), - hex_string, char_string); + /* + my_b_printf() does not support full printf() formats, so we + have to do it this way. + + TODO: Rewrite my_b_printf() to support full printf() syntax. + */ + char emit_buf[256]; + int const bytes_written= + my_snprintf(emit_buf, sizeof(emit_buf), + "# %8.8lx %-48.48s |%16s|\n", + (unsigned long) (hexdump_from + (i & 0xfffffff0)), + hex_string, char_string); + DBUG_ASSERT(bytes_written >= 0); + DBUG_ASSERT(static_cast<my_size_t>(bytes_written) < sizeof(emit_buf)); + my_b_write(file, emit_buf, bytes_written); hex_string[0]= 0; char_string[0]= 0; c= char_string; @@ -1024,28 +1125,52 @@ void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) /* Non-full last line */ if (hex_string[0]) - fprintf(file, "# %8.8lx %-48.48s |%s|\n# ", - (unsigned long) (hexdump_from + (i & 0xfffffff0)), - hex_string, char_string); + { + char emit_buf[256]; + int const bytes_written= + my_snprintf(emit_buf, sizeof(emit_buf), + "# %8.8lx %-48.48s |%s|\n# ", + (unsigned long) (hexdump_from + (i & 0xfffffff0)), + hex_string, char_string); + DBUG_ASSERT(bytes_written >= 0); + DBUG_ASSERT(static_cast<my_size_t>(bytes_written) < sizeof(emit_buf)); + my_b_write(file, emit_buf, bytes_written); + } } + DBUG_VOID_RETURN; } -void Log_event::print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info) +void Log_event::print_base64(IO_CACHE* file, + PRINT_EVENT_INFO* print_event_info, + bool more) { - uchar *ptr= (uchar*)temp_buf; + const uchar *ptr= (const uchar *)temp_buf; my_off_t size= uint4korr(ptr + EVENT_LEN_OFFSET); - char *tmp_str= - (char *) my_malloc(base64_needed_encoded_length(size), MYF(MY_WME)); + DBUG_ENTER("Log_event::print_base64"); + + size_t const tmp_str_sz= base64_needed_encoded_length(size); + char *const tmp_str= (char *) my_malloc(tmp_str_sz, MYF(MY_WME)); if (!tmp_str) { fprintf(stderr, "\nError: Out of memory. " "Could not print correct binlog event.\n"); - return; + DBUG_VOID_RETURN; } - int res= base64_encode(ptr, size, tmp_str); - fprintf(file, "\nBINLOG '\n%s\n';\n", tmp_str); + + int const res= base64_encode(ptr, size, tmp_str); + DBUG_ASSERT(res == 0); + + if (my_b_tell(file) == 0) + my_b_printf(file, "\nBINLOG '\n"); + + my_b_printf(file, "%s\n", tmp_str); + + if (!more) + my_b_printf(file, "';\n"); + my_free(tmp_str, MYF(0)); + DBUG_VOID_RETURN; } @@ -1053,9 +1178,10 @@ void Log_event::print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info) Log_event::print_timestamp() */ -void Log_event::print_timestamp(FILE* file, time_t* ts) +void Log_event::print_timestamp(IO_CACHE* file, time_t* ts) { struct tm *res; + DBUG_ENTER("Log_event::print_timestamp"); if (!ts) ts = &when; #ifdef MYSQL_SERVER // This is always false @@ -1065,13 +1191,14 @@ void Log_event::print_timestamp(FILE* file, time_t* ts) res=localtime(ts); #endif - fprintf(file,"%02d%02d%02d %2d:%02d:%02d", - res->tm_year % 100, - res->tm_mon+1, - res->tm_mday, - res->tm_hour, - res->tm_min, - res->tm_sec); + my_b_printf(file,"%02d%02d%02d %2d:%02d:%02d", + res->tm_year % 100, + res->tm_mon+1, + res->tm_mday, + res->tm_hour, + res->tm_min, + res->tm_sec); + DBUG_VOID_RETURN; } #endif /* MYSQL_CLIENT */ @@ -1531,7 +1658,7 @@ Query_log_event::Query_log_event(const char* buf, uint event_len, */ #ifdef MYSQL_CLIENT -void Query_log_event::print_query_header(FILE* file, +void Query_log_event::print_query_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info) { // TODO: print the catalog ?? @@ -1541,9 +1668,10 @@ void Query_log_event::print_query_header(FILE* file, if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\t%s\tthread_id=%lu\texec_time=%lu\terror_code=%d\n", - get_type_str(), (ulong) thread_id, (ulong) exec_time, error_code); + print_header(file, print_event_info, FALSE); + my_b_printf(file, "\t%s\tthread_id=%lu\texec_time=%lu\terror_code=%d\n", + get_type_str(), (ulong) thread_id, (ulong) exec_time, + error_code); } if (!(flags & LOG_EVENT_SUPPRESS_USE_F) && db) @@ -1551,15 +1679,15 @@ void Query_log_event::print_query_header(FILE* file, if (different_db= memcmp(print_event_info->db, db, db_len + 1)) memcpy(print_event_info->db, db, db_len + 1); if (db[0] && different_db) - fprintf(file, "use %s;\n", db); + my_b_printf(file, "use %s;\n", db); } end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10); *end++=';'; *end++='\n'; - my_fwrite(file, (byte*) buff, (uint) (end-buff),MYF(MY_NABP | MY_WME)); + my_b_write(file, (byte*) buff, (uint) (end-buff)); if (flags & LOG_EVENT_THREAD_SPECIFIC_F) - fprintf(file,"SET @@session.pseudo_thread_id=%lu;\n",(ulong)thread_id); + my_b_printf(file,"SET @@session.pseudo_thread_id=%lu;\n",(ulong)thread_id); /* If flags2_inited==0, this is an event from 3.23 or 4.0; nothing to @@ -1581,14 +1709,14 @@ void Query_log_event::print_query_header(FILE* file, if (unlikely(tmp)) /* some bits have changed */ { bool need_comma= 0; - fprintf(file, "SET "); + my_b_printf(file, "SET "); print_set_option(file, tmp, OPTION_NO_FOREIGN_KEY_CHECKS, ~flags2, "@@session.foreign_key_checks", &need_comma); print_set_option(file, tmp, OPTION_AUTO_IS_NULL, flags2, "@@session.sql_auto_is_null", &need_comma); print_set_option(file, tmp, OPTION_RELAXED_UNIQUE_CHECKS, ~flags2, "@@session.unique_checks", &need_comma); - fprintf(file,";\n"); + my_b_printf(file,";\n"); print_event_info->flags2= flags2; } } @@ -1616,14 +1744,14 @@ void Query_log_event::print_query_header(FILE* file, } if (unlikely(print_event_info->sql_mode != sql_mode)) { - fprintf(file,"SET @@session.sql_mode=%lu;\n",(ulong)sql_mode); + my_b_printf(file,"SET @@session.sql_mode=%lu;\n",(ulong)sql_mode); print_event_info->sql_mode= sql_mode; } } if (print_event_info->auto_increment_increment != auto_increment_increment || print_event_info->auto_increment_offset != auto_increment_offset) { - fprintf(file,"SET @@session.auto_increment_increment=%lu, @@session.auto_increment_offset=%lu;\n", + my_b_printf(file,"SET @@session.auto_increment_increment=%lu, @@session.auto_increment_offset=%lu;\n", auto_increment_increment,auto_increment_offset); print_event_info->auto_increment_increment= auto_increment_increment; print_event_info->auto_increment_offset= auto_increment_offset; @@ -1643,16 +1771,17 @@ void Query_log_event::print_query_header(FILE* file, CHARSET_INFO *cs_info= get_charset(uint2korr(charset), MYF(MY_WME)); if (cs_info) { - fprintf(file, "/*!\\C %s */;\n", cs_info->csname); /* for mysql client */ + /* for mysql client */ + my_b_printf(file, "/*!\\C %s */;\n", cs_info->csname); } - fprintf(file,"SET " - "@@session.character_set_client=%d," - "@@session.collation_connection=%d," - "@@session.collation_server=%d" - ";\n", - uint2korr(charset), - uint2korr(charset+2), - uint2korr(charset+4)); + my_b_printf(file,"SET " + "@@session.character_set_client=%d," + "@@session.collation_connection=%d," + "@@session.collation_server=%d" + ";\n", + uint2korr(charset), + uint2korr(charset+2), + uint2korr(charset+4)); memcpy(print_event_info->charset, charset, 6); } } @@ -1660,7 +1789,7 @@ void Query_log_event::print_query_header(FILE* file, { if (bcmp(print_event_info->time_zone_str, time_zone_str, time_zone_len+1)) { - fprintf(file,"SET @@session.time_zone='%s';\n", time_zone_str); + my_b_printf(file,"SET @@session.time_zone='%s';\n", time_zone_str); memcpy(print_event_info->time_zone_str, time_zone_str, time_zone_len+1); } } @@ -1669,9 +1798,11 @@ void Query_log_event::print_query_header(FILE* file, void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { - print_query_header(file, print_event_info); - my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME)); - fputs(";\n", file); + Write_on_release_cache cache(&print_event_info->head_cache, file); + + print_query_header(&cache, print_event_info); + my_b_write(&cache, (byte*) query, q_len); + my_b_printf(&cache, ";\n"); } #endif /* MYSQL_CLIENT */ @@ -1971,18 +2102,23 @@ void Start_log_event_v3::pack_info(Protocol *protocol) #ifdef MYSQL_CLIENT void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + DBUG_ENTER("Start_log_event_v3::print"); + + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tStart: binlog v %d, server v %s created ", binlog_version, - server_version); - print_timestamp(file); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tStart: binlog v %d, server v %s created ", + binlog_version, server_version); + print_timestamp(&cache); if (created) - fprintf(file," at startup"); - fputc('\n', file); + my_b_printf(&cache," at startup"); + my_b_printf(&cache, "\n"); if (flags & LOG_EVENT_BINLOG_IN_USE_F) - fprintf(file, "# Warning: this binlog was not closed properly. " - "Most probably mysqld crashed writing it.\n"); + my_b_printf(&cache, "# Warning: this binlog was not closed properly. " + "Most probably mysqld crashed writing it.\n"); } if (!artificial_event && created) { @@ -1993,12 +2129,12 @@ void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) and rollback unfinished transaction. Probably this can be done with RESET CONNECTION (syntax to be defined). */ - fprintf(file,"RESET CONNECTION;\n"); + my_b_printf(&cache,"RESET CONNECTION;\n"); #else - fprintf(file,"ROLLBACK;\n"); + my_b_printf(&cache,"ROLLBACK;\n"); #endif } - fflush(file); + DBUG_VOID_RETURN; } #endif /* MYSQL_CLIENT */ @@ -2722,15 +2858,17 @@ void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) } -void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info, +void Load_log_event::print(FILE* file_arg, PRINT_EVENT_INFO* print_event_info, bool commented) { + Write_on_release_cache cache(&print_event_info->head_cache, file_arg); + DBUG_ENTER("Load_log_event::print"); if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tQuery\tthread_id=%ld\texec_time=%ld\n", - thread_id, exec_time); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tQuery\tthread_id=%ld\texec_time=%ld\n", + thread_id, exec_time); } bool different_db= 1; @@ -2748,65 +2886,65 @@ void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info, } if (db && db[0] && different_db) - fprintf(file, "%suse %s;\n", + my_b_printf(&cache, "%suse %s;\n", commented ? "# " : "", db); if (flags & LOG_EVENT_THREAD_SPECIFIC_F) - fprintf(file,"%sSET @@session.pseudo_thread_id=%lu;\n", + my_b_printf(&cache,"%sSET @@session.pseudo_thread_id=%lu;\n", commented ? "# " : "", (ulong)thread_id); - fprintf(file, "%sLOAD DATA ", + my_b_printf(&cache, "%sLOAD DATA ", commented ? "# " : ""); if (check_fname_outside_temp_buf()) - fprintf(file, "LOCAL "); - fprintf(file, "INFILE '%-*s' ", fname_len, fname); + my_b_printf(&cache, "LOCAL "); + my_b_printf(&cache, "INFILE '%-*s' ", fname_len, fname); if (sql_ex.opt_flags & REPLACE_FLAG) - fprintf(file," REPLACE "); + my_b_printf(&cache," REPLACE "); else if (sql_ex.opt_flags & IGNORE_FLAG) - fprintf(file," IGNORE "); + my_b_printf(&cache," IGNORE "); - fprintf(file, "INTO TABLE `%s`", table_name); - fprintf(file, " FIELDS TERMINATED BY "); - pretty_print_str(file, sql_ex.field_term, sql_ex.field_term_len); + my_b_printf(&cache, "INTO TABLE `%s`", table_name); + my_b_printf(&cache, " FIELDS TERMINATED BY "); + pretty_print_str(&cache, sql_ex.field_term, sql_ex.field_term_len); if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG) - fprintf(file," OPTIONALLY "); - fprintf(file, " ENCLOSED BY "); - pretty_print_str(file, sql_ex.enclosed, sql_ex.enclosed_len); + my_b_printf(&cache," OPTIONALLY "); + my_b_printf(&cache, " ENCLOSED BY "); + pretty_print_str(&cache, sql_ex.enclosed, sql_ex.enclosed_len); - fprintf(file, " ESCAPED BY "); - pretty_print_str(file, sql_ex.escaped, sql_ex.escaped_len); + my_b_printf(&cache, " ESCAPED BY "); + pretty_print_str(&cache, sql_ex.escaped, sql_ex.escaped_len); - fprintf(file," LINES TERMINATED BY "); - pretty_print_str(file, sql_ex.line_term, sql_ex.line_term_len); + my_b_printf(&cache," LINES TERMINATED BY "); + pretty_print_str(&cache, sql_ex.line_term, sql_ex.line_term_len); if (sql_ex.line_start) { - fprintf(file," STARTING BY "); - pretty_print_str(file, sql_ex.line_start, sql_ex.line_start_len); + my_b_printf(&cache," STARTING BY "); + pretty_print_str(&cache, sql_ex.line_start, sql_ex.line_start_len); } if ((long) skip_lines > 0) - fprintf(file, " IGNORE %ld LINES", (long) skip_lines); + my_b_printf(&cache, " IGNORE %ld LINES", (long) skip_lines); if (num_fields) { uint i; const char* field = fields; - fprintf(file, " ("); + my_b_printf(&cache, " ("); for (i = 0; i < num_fields; i++) { if (i) - fputc(',', file); - fprintf(file, field); + my_b_printf(&cache, ","); + my_b_printf(&cache, field); field += field_lens[i] + 1; } - fputc(')', file); + my_b_printf(&cache, ")"); } - fprintf(file, ";\n"); + my_b_printf(&cache, ";\n"); DBUG_VOID_RETURN; } #endif /* MYSQL_CLIENT */ @@ -3139,17 +3277,16 @@ void Rotate_log_event::pack_info(Protocol *protocol) void Rotate_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { char buf[22]; + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); if (print_event_info->short_form) return; - print_header(file, print_event_info); - fprintf(file, "\tRotate to "); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tRotate to "); if (new_log_ident) - my_fwrite(file, (byte*) new_log_ident, (uint)ident_len, - MYF(MY_NABP | MY_WME)); - fprintf(file, " pos: %s", llstr(pos, buf)); - fputc('\n', file); - fflush(file); + my_b_write(&cache, (byte*) new_log_ident, (uint)ident_len); + my_b_printf(&cache, " pos: %s\n", llstr(pos, buf)); } #endif /* MYSQL_CLIENT */ @@ -3364,14 +3501,16 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) char llbuff[22]; const char *msg; LINT_INIT(msg); + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tIntvar\n"); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tIntvar\n"); } - fprintf(file, "SET "); + my_b_printf(&cache, "SET "); switch (type) { case LAST_INSERT_ID_EVENT: msg="LAST_INSERT_ID"; @@ -3384,8 +3523,7 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) msg="INVALID_INT"; break; } - fprintf(file, "%s=%s;\n", msg, llstr(val,llbuff)); - fflush(file); + my_b_printf(&cache, "%s=%s;\n", msg, llstr(val,llbuff)); } #endif @@ -3454,15 +3592,17 @@ bool Rand_log_event::write(IO_CACHE* file) #ifdef MYSQL_CLIENT void Rand_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + char llbuff[22],llbuff2[22]; if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tRand\n"); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tRand\n"); } - fprintf(file, "SET @@RAND_SEED1=%s, @@RAND_SEED2=%s;\n", - llstr(seed1, llbuff),llstr(seed2, llbuff2)); - fflush(file); + my_b_printf(&cache, "SET @@RAND_SEED1=%s, @@RAND_SEED2=%s;\n", + llstr(seed1, llbuff),llstr(seed2, llbuff2)); } #endif /* MYSQL_CLIENT */ @@ -3524,16 +3664,18 @@ bool Xid_log_event::write(IO_CACHE* file) #ifdef MYSQL_CLIENT void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + if (!print_event_info->short_form) { char buf[64]; longlong10_to_str(xid, buf, 10); - print_header(file, print_event_info); - fprintf(file, "\tXid = %s\n", buf); - fflush(file); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tXid = %s\n", buf); } - fprintf(file, "COMMIT;\n"); + my_b_printf(&cache, "COMMIT;\n"); } #endif /* MYSQL_CLIENT */ @@ -3723,19 +3865,22 @@ bool User_var_log_event::write(IO_CACHE* file) #ifdef MYSQL_CLIENT void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tUser_var\n"); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tUser_var\n"); } - fprintf(file, "SET @`"); - my_fwrite(file, (byte*) name, (uint) (name_len), MYF(MY_NABP | MY_WME)); - fprintf(file, "`"); + my_b_printf(&cache, "SET @`"); + my_b_write(&cache, (byte*) name, (uint) (name_len)); + my_b_printf(&cache, "`"); if (is_null) { - fprintf(file, ":=NULL;\n"); + my_b_printf(&cache, ":=NULL;\n"); } else { @@ -3743,12 +3888,12 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) case REAL_RESULT: double real_val; float8get(real_val, val); - fprintf(file, ":=%.14g;\n", real_val); + my_b_printf(&cache, ":=%.14g;\n", real_val); break; case INT_RESULT: char int_buf[22]; longlong10_to_str(uint8korr(val), int_buf, -10); - fprintf(file, ":=%s;\n", int_buf); + my_b_printf(&cache, ":=%s;\n", int_buf); break; case DECIMAL_RESULT: { @@ -3764,7 +3909,7 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) bin2decimal(val+2, &dec, precision, scale); decimal2string(&dec, str_buf, &str_len, 0, 0, 0); str_buf[str_len]= 0; - fprintf(file, ":=%s;\n",str_buf); + my_b_printf(&cache, ":=%s;\n",str_buf); break; } case STRING_RESULT: @@ -3800,9 +3945,9 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) Generate an unusable command (=> syntax error) is probably the best thing we can do here. */ - fprintf(file, ":=???;\n"); + my_b_printf(&cache, ":=???;\n"); else - fprintf(file, ":=_%s %s COLLATE `%s`;\n", cs->csname, hex_str, cs->name); + my_b_printf(&cache, ":=_%s %s COLLATE `%s`;\n", cs->csname, hex_str, cs->name); my_afree(hex_str); } break; @@ -3812,7 +3957,6 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) return; } } - fflush(file); } #endif @@ -3896,13 +4040,14 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli) #ifdef HAVE_REPLICATION #ifdef MYSQL_CLIENT -void Unknown_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) +void Unknown_log_event::print(FILE* file_arg, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file_arg); + if (print_event_info->short_form) return; - print_header(file, print_event_info); - fputc('\n', file); - fprintf(file, "# %s", "Unknown event\n"); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\n# %s", "Unknown event\n"); } #endif @@ -3969,12 +4114,13 @@ Slave_log_event::~Slave_log_event() #ifdef MYSQL_CLIENT void Slave_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file); + char llbuff[22]; if (print_event_info->short_form) return; - print_header(file, print_event_info); - fputc('\n', file); - fprintf(file, "\ + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\n\ Slave: master_host: '%s' master_port: %d master_log: '%s' master_pos: %s\n", master_host, master_port, master_log, llstr(master_pos, llbuff)); } @@ -4054,12 +4200,14 @@ int Slave_log_event::exec_event(struct st_relay_log_info* rli) #ifdef MYSQL_CLIENT void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + if (print_event_info->short_form) return; - print_header(file, print_event_info); - fprintf(file, "\tStop\n"); - fflush(file); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tStop\n"); } #endif /* MYSQL_CLIENT */ @@ -4234,6 +4382,8 @@ Create_file_log_event::Create_file_log_event(const char* buf, uint len, void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info, bool enable_local) { + Write_on_release_cache cache(&print_event_info->head_cache, file); + if (print_event_info->short_form) { if (enable_local && check_fname_outside_temp_buf()) @@ -4249,10 +4399,10 @@ void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info That one is for "file_id: etc" below: in mysqlbinlog we want the #, in SHOW BINLOG EVENTS we don't. */ - fprintf(file, "#"); + my_b_printf(&cache, "#"); } - fprintf(file, " file_id: %d block_len: %d\n", file_id, block_len); + my_b_printf(&cache, " file_id: %d block_len: %d\n", file_id, block_len); } @@ -4422,12 +4572,13 @@ bool Append_block_log_event::write(IO_CACHE* file) void Append_block_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file); + if (print_event_info->short_form) return; - print_header(file, print_event_info); - fputc('\n', file); - fprintf(file, "#%s: file_id: %d block_len: %d\n", - get_type_str(), file_id, block_len); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\n#%s: file_id: %d block_len: %d\n", + get_type_str(), file_id, block_len); } #endif /* MYSQL_CLIENT */ @@ -4565,11 +4716,12 @@ bool Delete_file_log_event::write(IO_CACHE* file) void Delete_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file); + if (print_event_info->short_form) return; - print_header(file, print_event_info); - fputc('\n', file); - fprintf(file, "#Delete_file: file_id=%u\n", file_id); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\n#Delete_file: file_id=%u\n", file_id); } #endif /* MYSQL_CLIENT */ @@ -4660,12 +4812,13 @@ bool Execute_load_log_event::write(IO_CACHE* file) void Execute_load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file); + if (print_event_info->short_form) return; - print_header(file, print_event_info); - fputc('\n', file); - fprintf(file, "#Exec_load: file_id=%d\n", - file_id); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\n#Exec_load: file_id=%d\n", + file_id); } #endif @@ -4882,29 +5035,30 @@ void Execute_load_query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info, const char *local_fname) { - print_query_header(file, print_event_info); + Write_on_release_cache cache(&print_event_info->head_cache, file); + + print_query_header(&cache, print_event_info); if (local_fname) { - my_fwrite(file, (byte*) query, fn_pos_start, MYF(MY_NABP | MY_WME)); - fprintf(file, " LOCAL INFILE \'"); - fprintf(file, local_fname); - fprintf(file, "\'"); + my_b_write(&cache, (byte*) query, fn_pos_start); + my_b_printf(&cache, " LOCAL INFILE \'"); + my_b_printf(&cache, local_fname); + my_b_printf(&cache, "\'"); if (dup_handling == LOAD_DUP_REPLACE) - fprintf(file, " REPLACE"); - fprintf(file, " INTO"); - my_fwrite(file, (byte*) query + fn_pos_end, q_len-fn_pos_end, - MYF(MY_NABP | MY_WME)); - fprintf(file, ";\n"); + my_b_printf(&cache, " REPLACE"); + my_b_printf(&cache, " INTO"); + my_b_write(&cache, (byte*) query + fn_pos_end, q_len-fn_pos_end); + my_b_printf(&cache, ";\n"); } else { - my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME)); - fprintf(file, ";\n"); + my_b_write(&cache, (byte*) query, q_len); + my_b_printf(&cache, ";\n"); } if (!print_event_info->short_form) - fprintf(file, "# file_id: %d \n", file_id); + my_b_printf(&cache, "# file_id: %d \n", file_id); } #endif @@ -5639,6 +5793,31 @@ void Rows_log_event::pack_info(Protocol *protocol) } #endif +#ifdef MYSQL_CLIENT +void Rows_log_event::print_helper(FILE *file, + PRINT_EVENT_INFO *print_event_info, + char const *const name) +{ + IO_CACHE *const head= &print_event_info->head_cache; + IO_CACHE *const body= &print_event_info->body_cache; + if (!print_event_info->short_form) + { + bool const last_stmt_event= get_flags(STMT_END_F); + print_header(head, print_event_info, !last_stmt_event); + my_b_printf(head, "\t%s: table id %lu", name, m_table_id); + print_base64(body, print_event_info, !last_stmt_event); + } + + if (get_flags(STMT_END_F)) + { + my_b_copy_to_file(head, file); + my_b_copy_to_file(body, file); + reinit_io_cache(head, WRITE_CACHE, 0, FALSE, TRUE); + reinit_io_cache(body, WRITE_CACHE, 0, FALSE, TRUE); + } +} +#endif + /************************************************************************** Table_map_log_event member functions **************************************************************************/ @@ -6049,10 +6228,11 @@ void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) { if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tTable_map: `%s`.`%s` mapped to number %lu\n", - m_dbnam, m_tblnam, m_table_id); - print_base64(file, print_event_info); + print_header(&print_event_info->head_cache, print_event_info, TRUE); + my_b_printf(&print_event_info->head_cache, + "\tTable_map: `%s`.`%s` mapped to number %lu\n", + m_dbnam, m_tblnam, m_table_id); + print_base64(&print_event_info->body_cache, print_event_info, TRUE); } } #endif @@ -6315,12 +6495,7 @@ int Write_rows_log_event::do_exec_row(TABLE *table) #ifdef MYSQL_CLIENT void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info) { - if (!print_event_info->short_form) - { - print_header(file, print_event_info); - fprintf(file, "\tWrite_rows: table id %lu", m_table_id); - print_base64(file, print_event_info); - } + Rows_log_event::print_helper(file, print_event_info, "Write_rows"); } #endif @@ -6644,12 +6819,7 @@ int Delete_rows_log_event::do_exec_row(TABLE *table) void Delete_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info) { - if (!print_event_info->short_form) - { - print_header(file, print_event_info); - fprintf(file, "\tDelete_rows: table id %lu", m_table_id); - print_base64(file, print_event_info); - } + Rows_log_event::print_helper(file, print_event_info, "Delete_rows"); } #endif @@ -6800,12 +6970,7 @@ int Update_rows_log_event::do_exec_row(TABLE *table) void Update_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info) { - if (!print_event_info->short_form) - { - print_header(file, print_event_info); - fprintf(file, "\tUpdate_rows: table id %lu", m_table_id); - print_base64(file, print_event_info); - } + Rows_log_event::print_helper(file, print_event_info, "Update_rows"); } #endif diff --git a/sql/log_event.h b/sql/log_event.h index 36933f4a7dd..68e165e97ef 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -519,14 +519,30 @@ typedef struct st_print_event_info bzero(db, sizeof(db)); bzero(charset, sizeof(charset)); bzero(time_zone_str, sizeof(time_zone_str)); + uint const flags = MYF(MY_WME | MY_NABP); + init_io_cache(&head_cache, -1, 0, WRITE_CACHE, 0L, FALSE, flags); + init_io_cache(&body_cache, -1, 0, WRITE_CACHE, 0L, FALSE, flags); } + ~st_print_event_info() { + end_io_cache(&head_cache); + end_io_cache(&body_cache); + } + + /* Settings on how to print the events */ bool short_form; bool base64_output; my_off_t hexdump_from; uint8 common_header_len; + /* + These two caches are used by the row-based replication events to + collect the header information and the main body of the events + making up a statement. + */ + IO_CACHE head_cache; + IO_CACHE body_cache; } PRINT_EVENT_INFO; #endif @@ -637,9 +653,11 @@ public: const Format_description_log_event *description_event); /* print*() functions are used by mysqlbinlog */ virtual void print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0; - void print_timestamp(FILE* file, time_t *ts = 0); - void print_header(FILE* file, PRINT_EVENT_INFO* print_event_info); - void print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info); + void print_timestamp(IO_CACHE* file, time_t *ts = 0); + void print_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, + bool is_more); + void print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, + bool is_more); #endif static void *operator new(size_t size) @@ -804,7 +822,7 @@ public: uint32 q_len_arg); #endif /* HAVE_REPLICATION */ #else - void print_query_header(FILE* file, PRINT_EVENT_INFO* print_event_info); + void print_query_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info); void print(FILE* file, PRINT_EVENT_INFO* print_event_info); #endif @@ -1843,6 +1861,10 @@ protected: Log_event_type event_type, const Format_description_log_event *description_event); +#ifdef MYSQL_CLIENT + void print_helper(FILE *, PRINT_EVENT_INFO *, char const *const name); +#endif + #ifndef MYSQL_CLIENT virtual int do_add_row_data(byte *data, my_size_t length); #endif diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 0939ad66cd0..23ca5330053 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -31,6 +31,7 @@ void mysql_client_binlog_statement(THD* thd) { + DBUG_ENTER("mysql_client_binlog_statement"); DBUG_PRINT("info",("binlog base64: '%*s'", (thd->lex->comment.length < 2048 ? thd->lex->comment.length : 2048), @@ -43,8 +44,8 @@ void mysql_client_binlog_statement(THD* thd) my_bool nsok= thd->net.no_send_ok; thd->net.no_send_ok= TRUE; - const my_size_t coded_len= thd->lex->comment.length + 1; - const my_size_t event_len= base64_needed_decoded_length(coded_len); + my_size_t coded_len= thd->lex->comment.length + 1; + my_size_t decoded_len= base64_needed_decoded_length(coded_len); DBUG_ASSERT(coded_len > 0); /* @@ -57,9 +58,8 @@ void mysql_client_binlog_statement(THD* thd) new Format_description_log_event(4); const char *error= 0; - char *buf= (char *) my_malloc(event_len, MYF(MY_WME)); + char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME)); Log_event *ev = 0; - int res; /* Out of memory check @@ -73,43 +73,97 @@ void mysql_client_binlog_statement(THD* thd) thd->rli_fake->sql_thd= thd; thd->rli_fake->no_storage= TRUE; - res= base64_decode(thd->lex->comment.str, coded_len, buf); - - DBUG_PRINT("info",("binlog base64 decoded_len=%d, event_len=%d\n", - res, uint4korr(buf + EVENT_LEN_OFFSET))); - /* - Note that 'res' is the correct event length, 'event_len' was - calculated based on the base64-string that possibly contained - extra spaces, so it can be longer than the real event. - */ - if (res < EVENT_LEN_OFFSET - || (uint) res != uint4korr(buf+EVENT_LEN_OFFSET)) + for (char const *strptr= thd->lex->comment.str ; + strptr < thd->lex->comment.str + thd->lex->comment.length ; ) { - my_error(ER_SYNTAX_ERROR, MYF(0)); - goto end; - } - - ev= Log_event::read_log_event(buf, res, &error, desc); + char const *endptr= 0; + int bytes_decoded= base64_decode(strptr, coded_len, buf, &endptr); + + DBUG_PRINT("info", + ("bytes_decoded=%d; strptr=0x%lu; endptr=0x%lu ('%c':%d)", + bytes_decoded, strptr, endptr, *endptr, *endptr)); + + if (bytes_decoded < 0) + { + my_error(ER_BASE64_DECODE_ERROR, MYF(0)); + goto end; + } + else if (bytes_decoded == 0) + break; // If no bytes where read, the string contained only whitespace + + DBUG_ASSERT(bytes_decoded > 0); + DBUG_ASSERT(endptr > strptr); + coded_len-= endptr - strptr; + strptr= endptr; - DBUG_PRINT("info",("binlog base64 err=%s", error)); - if (!ev) - { /* - This could actually be an out-of-memory, but it is more - likely causes by a bad statement + Now we have one or more events stored in the buffer. The size of + the buffer is computed based on how much base64-encoded data + there were, so there should be ample space for the data (maybe + even too much, since a statement can consist of a considerable + number of events). + + TODO: Switch to use a stream-based base64 encoder/decoder in + order to be able to read exactly what is necessary. */ - my_error(ER_SYNTAX_ERROR, MYF(0)); - goto end; - } - DBUG_PRINT("info",("ev->get_type_code()=%d", ev->get_type_code())); - DBUG_PRINT("info",("buf+EVENT_TYPE_OFFSET=%d", buf+EVENT_TYPE_OFFSET)); + DBUG_PRINT("info",("binlog base64 decoded_len=%d, bytes_decoded=%d", + decoded_len, bytes_decoded)); - ev->thd= thd; - if (ev->exec_event(thd->rli_fake)) - { - my_error(ER_UNKNOWN_ERROR, MYF(0), "Error executing BINLOG statement"); - goto end; + /* + Now we start to read events of the buffer, until there are no + more. + */ + for (char *bufptr= buf ; bytes_decoded > 0 ; ) + { + /* + Checking that the first event in the buffer is not truncated. + */ + ulong event_len= uint4korr(bufptr + EVENT_LEN_OFFSET); + DBUG_PRINT("info", ("event_len=%lu, bytes_decoded=%d", + event_len, bytes_decoded)); + if (bytes_decoded < EVENT_LEN_OFFSET || (uint) bytes_decoded < event_len) + { + my_error(ER_SYNTAX_ERROR, MYF(0)); + goto end; + } + + ev= Log_event::read_log_event(bufptr, event_len, &error, desc); + + DBUG_PRINT("info",("binlog base64 err=%s", error)); + if (!ev) + { + /* + This could actually be an out-of-memory, but it is more likely + causes by a bad statement + */ + my_error(ER_SYNTAX_ERROR, MYF(0)); + goto end; + } + + bytes_decoded -= event_len; + bufptr += event_len; + + DBUG_PRINT("info",("ev->get_type_code()=%d", ev->get_type_code())); + DBUG_PRINT("info",("bufptr+EVENT_TYPE_OFFSET=0x%lx", + bufptr+EVENT_TYPE_OFFSET)); + DBUG_PRINT("info", ("bytes_decoded=%d; bufptr=0x%lx; buf[EVENT_LEN_OFFSET]=%u", + bytes_decoded, bufptr, uint4korr(bufptr+EVENT_LEN_OFFSET))); + ev->thd= thd; + if (int err= ev->exec_event(thd->rli_fake)) + { + DBUG_PRINT("info", ("exec_event() - error=%d", error)); + /* + TODO: Maybe a better error message since the BINLOG statement + now contains several events. + */ + my_error(ER_UNKNOWN_ERROR, MYF(0), "Error executing BINLOG statement"); + goto end; + } + + delete ev; + ev= 0; + } } /* @@ -126,10 +180,7 @@ end: */ thd->net.no_send_ok= nsok; - if (ev) - delete ev; - if (desc) - delete desc; - if (buf) - my_free(buf, MYF(0)); + delete desc; + my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); + DBUG_VOID_RETURN; } |