diff options
-rw-r--r-- | sql/log.cc | 259 | ||||
-rw-r--r-- | sql/log.h | 3 | ||||
-rw-r--r-- | sql/log_event.cc | 461 | ||||
-rw-r--r-- | sql/log_event.h | 214 | ||||
-rw-r--r-- | sql/log_event_old.cc | 15 | ||||
-rw-r--r-- | sql/log_event_old.h | 4 | ||||
-rw-r--r-- | sql/slave.cc | 2 | ||||
-rw-r--r-- | sql/wsrep_binlog.cc | 3 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 9 |
9 files changed, 456 insertions, 514 deletions
diff --git a/sql/log.cc b/sql/log.cc index 0944c6e8b42..8d8c96f6703 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -3461,7 +3461,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, if (!s.is_valid()) goto err; s.dont_set_created= null_created_arg; - if (s.write(&log_file)) + if (write_event(&s)) goto err; bytes_written+= s.data_written; @@ -3504,7 +3504,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, */ Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state, 0); - if (gl_ev.write(&log_file)) + if (write_event(&gl_ev)) goto err; /* Output a binlog checkpoint event at the start of the binlog file. */ @@ -3555,7 +3555,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, flush_io_cache(&log_file); mysql_file_sync(log_file.file, MYF(MY_WME)); DBUG_SUICIDE();); - if (ev.write(&log_file)) + if (write_event(&ev)) goto err; bytes_written+= ev.data_written; } @@ -3587,7 +3587,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, /* Don't set log_pos in event header */ description_event_for_queue->set_artificial_event(); - if (description_event_for_queue->write(&log_file)) + if (write_event(description_event_for_queue)) goto err; bytes_written+= description_event_for_queue->data_written; } @@ -4991,7 +4991,7 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock) r.checksum_alg= relay_log_checksum_alg; DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); if(DBUG_EVALUATE_IF("fault_injection_new_file_rotate_event", (error=close_on_error=TRUE), FALSE) || - (error= r.write(&log_file))) + (error= write_event(&r))) { DBUG_EXECUTE_IF("fault_injection_new_file_rotate_event", errno=2;); close_on_error= TRUE; @@ -5105,9 +5105,13 @@ end: DBUG_RETURN(error); } +bool MYSQL_BIN_LOG::write_event(Log_event *ev, IO_CACHE *file) +{ + Log_event_writer writer(file); + return writer.write(ev); +} -bool -MYSQL_BIN_LOG::append(Log_event *ev) +bool MYSQL_BIN_LOG::append(Log_event *ev) { bool res; mysql_mutex_lock(&LOCK_log); @@ -5124,11 +5128,8 @@ bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev) mysql_mutex_assert_owner(&LOCK_log); DBUG_ASSERT(log_file.type == SEQ_READ_APPEND); - /* - Log_event::write() is smart enough to use my_b_write() or - my_b_append() depending on the kind of cache we have. - */ - if (ev->write(&log_file)) + + if (write_event(ev)) { error=1; goto err; @@ -5516,15 +5517,16 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional, IO_CACHE *file= cache_mngr->get_binlog_cache_log(use_trans_cache(this, is_transactional)); + Log_event_writer writer(file); if (with_annotate && *with_annotate) { Annotate_rows_log_event anno(table->in_use, is_transactional, false); /* Annotate event should be written not more than once */ *with_annotate= 0; - if ((error= anno.write(file))) + if ((error= writer.write(&anno))) DBUG_RETURN(error); } - if ((error= the_event.write(file))) + if ((error= writer.write(&the_event))) DBUG_RETURN(error); binlog_table_maps++; @@ -5651,14 +5653,14 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, if (Rows_log_event* pending= cache_data->pending()) { - IO_CACHE *file= &cache_data->cache_log; + Log_event_writer writer(&cache_data->cache_log); /* Write pending event to the cache. */ DBUG_EXECUTE_IF("simulate_disk_full_at_flush_pending", {DBUG_SET("+d,simulate_file_write_error");}); - if (pending->write(file)) + if (writer.write(pending)) { set_write_error(thd, is_transactional); if (check_write_error(thd) && cache_data && @@ -5746,7 +5748,8 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, commit_id); /* Write the event to the binary log. */ - if (gtid_event.write(&mysql_bin_log.log_file)) + DBUG_ASSERT(this == &mysql_bin_log); + if (write_event(>id_event)) DBUG_RETURN(true); status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written); @@ -6080,7 +6083,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) Annotate_rows_log_event anno(thd, using_trans, direct); /* Annotate event should be written not more than once */ *with_annotate= 0; - if (anno.write(file)) + if (write_event(&anno, file)) goto err; } @@ -6094,7 +6097,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT, thd->first_successful_insert_id_in_prev_stmt_for_binlog, using_trans, direct); - if (e.write(file)) + if (write_event(&e, file)) goto err; } if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0) @@ -6105,14 +6108,14 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) Intvar_log_event e(thd, (uchar) INSERT_ID_EVENT, thd->auto_inc_intervals_in_cur_stmt_for_binlog. minimum(), using_trans, direct); - if (e.write(file)) + if (write_event(&e, file)) goto err; } if (thd->rand_used) { Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2, using_trans, direct); - if (e.write(file)) + if (write_event(&e, file)) goto err; } if (thd->user_var_events.elements) @@ -6136,7 +6139,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) flags, using_trans, direct); - if (e.write(file)) + if (write_event(&e, file)) goto err; } } @@ -6146,7 +6149,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) /* Write the event. */ - if (event_info->write(file) || + if (write_event(event_info, file) || DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0)) goto err; @@ -6525,33 +6528,34 @@ uint MYSQL_BIN_LOG::next_file_id() return res; } +class CacheWriter: public Log_event_writer +{ +public: + ulong remains; -/** - Calculate checksum of possibly a part of an event containing at least - the whole common header. - - @param buf the pointer to trans cache's buffer - @param off the offset of the beginning of the event in the buffer - @param event_len no-checksum length of the event - @param length the current size of the buffer - - @param crc [in-out] the checksum + CacheWriter(THD *thd_arg, IO_CACHE *file_arg, bool do_checksum) + : Log_event_writer(file_arg), remains(0), thd(thd_arg), first(true) + { checksum_len= do_checksum ? BINLOG_CHECKSUM_LEN : 0; } - Event size in incremented by @c BINLOG_CHECKSUM_LEN. + ~CacheWriter() + { status_var_add(thd->status_var.binlog_bytes_written, bytes_written); } - @return 0 or number of unprocessed yet bytes of the event excluding - the checksum part. -*/ - static ulong fix_log_event_crc(uchar *buf, uint off, uint event_len, - uint length, ha_checksum *crc) -{ - ulong ret; - uchar *event_begin= buf + off; + int write(uchar* pos, size_t len) + { + if (first) + write_header(pos, len); + else + write_data(pos, len); - ret= length >= off + event_len ? 0 : off + event_len - length; - *crc= my_checksum(*crc, event_begin, event_len - ret); - return ret; -} + remains -= len; + if ((first= !remains)) + write_footer(); + return 0; + } +private: + THD *thd; + bool first; +}; /* Write the contents of a cache to the binary log. @@ -6572,21 +6576,19 @@ uint MYSQL_BIN_LOG::next_file_id() int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) { + DBUG_ENTER("MYSQL_BIN_LOG::write_cache"); + mysql_mutex_assert_owner(&LOCK_log); if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) - return ER_ERROR_ON_WRITE; + DBUG_RETURN(ER_ERROR_ON_WRITE); uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs; - ulong remains= 0; // part of unprocessed yet netto length of the event long val; ulong end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t uchar header[LOG_EVENT_HEADER_LEN]; - ha_checksum crc= 0; - my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF); - uchar buf[BINLOG_CHECKSUM_LEN]; - DBUG_ENTER("MYSQL_BIN_LOG::write_cache"); + CacheWriter writer(thd, &log_file, binlog_checksum_options); // while there is just one alg the following must hold: - DBUG_ASSERT(!do_checksum || + DBUG_ASSERT(binlog_checksum_options == BINLOG_CHECKSUM_ALG_OFF || binlog_checksum_options == BINLOG_CHECKSUM_ALG_CRC32); /* @@ -6615,53 +6617,40 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) if (unlikely(carry > 0)) { DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN); + uint tail= LOG_EVENT_HEADER_LEN - carry; /* assemble both halves */ - memcpy(&header[carry], (char *)cache->read_pos, - LOG_EVENT_HEADER_LEN - carry); + memcpy(&header[carry], (char *)cache->read_pos, tail); + + ulong len= uint4korr(header + EVENT_LEN_OFFSET); + writer.remains= len; /* fix end_log_pos */ - val= uint4korr(&header[LOG_POS_OFFSET]) + group + - (end_log_pos_inc+= (do_checksum ? BINLOG_CHECKSUM_LEN : 0)); - int4store(&header[LOG_POS_OFFSET], val); + end_log_pos_inc += writer.checksum_len; + val= uint4korr(header + LOG_POS_OFFSET) + group + end_log_pos_inc; + int4store(header + LOG_POS_OFFSET, val); - if (do_checksum) - { - ulong len= uint4korr(&header[EVENT_LEN_OFFSET]); - /* fix len */ - int4store(&header[EVENT_LEN_OFFSET], len + BINLOG_CHECKSUM_LEN); - } + /* fix len */ + len+= writer.checksum_len; + int4store(header + EVENT_LEN_OFFSET, len); - /* write the first half of the split header */ - if (my_b_write(&log_file, header, carry)) + if (writer.write(header, LOG_EVENT_HEADER_LEN)) DBUG_RETURN(ER_ERROR_ON_WRITE); - status_var_add(thd->status_var.binlog_bytes_written, carry); - /* - copy fixed second half of header to cache so the correct - version will be written later. - */ - memcpy((char *)cache->read_pos, &header[carry], - LOG_EVENT_HEADER_LEN - carry); + cache->read_pos+= tail; + length-= tail; + carry= 0; /* next event header at ... */ - hdr_offs= uint4korr(&header[EVENT_LEN_OFFSET]) - carry - - (do_checksum ? BINLOG_CHECKSUM_LEN : 0); - - if (do_checksum) - { - DBUG_ASSERT(crc == 0 && remains == 0); - crc= my_checksum(crc, header, carry); - remains= uint4korr(header + EVENT_LEN_OFFSET) - carry - - BINLOG_CHECKSUM_LEN; - } - carry= 0; + hdr_offs= len - LOG_EVENT_HEADER_LEN - writer.checksum_len; } /* if there is anything to write, process it. */ if (likely(length > 0)) { + DBUG_EXECUTE_IF("fail_binlog_write_1", + errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE);); /* process all event-headers in this (partial) cache. if next header is beyond current read-buffer, @@ -6669,52 +6658,28 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) very next iteration, just "eventually"). */ - /* crc-calc the whole buffer */ - if (do_checksum && hdr_offs >= length) + if (hdr_offs >= length) { - - DBUG_ASSERT(remains != 0 && crc != 0); - - crc= my_checksum(crc, cache->read_pos, length); - remains -= length; - if (my_b_write(&log_file, cache->read_pos, length)) + if (writer.write(cache->read_pos, length)) DBUG_RETURN(ER_ERROR_ON_WRITE); - if (remains == 0) - { - int4store(buf, crc); - if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN)) - DBUG_RETURN(ER_ERROR_ON_WRITE); - crc= 0; - } } while (hdr_offs < length) { /* - partial header only? save what we can get, process once - we get the rest. + finish off with remains of the last event that crawls + from previous into the current buffer */ - - if (do_checksum) + if (writer.remains != 0) { - if (remains != 0) - { - /* - finish off with remains of the last event that crawls - from previous into the current buffer - */ - DBUG_ASSERT(crc != 0); - crc= my_checksum(crc, cache->read_pos, hdr_offs); - int4store(buf, crc); - remains -= hdr_offs; - DBUG_ASSERT(remains == 0); - if (my_b_write(&log_file, cache->read_pos, hdr_offs) || - my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN)) - DBUG_RETURN(ER_ERROR_ON_WRITE); - crc= 0; - } + if (writer.write(cache->read_pos, hdr_offs)) + DBUG_RETURN(ER_ERROR_ON_WRITE); } + /* + partial header only? save what we can get, process once + we get the rest. + */ if (hdr_offs + LOG_EVENT_HEADER_LEN > length) { carry= length - hdr_offs; @@ -6725,37 +6690,25 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) { /* we've got a full event-header, and it came in one piece */ uchar *ev= (uchar *)cache->read_pos + hdr_offs; - uint event_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len + uint ev_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len uchar *log_pos= ev + LOG_POS_OFFSET; + end_log_pos_inc += writer.checksum_len; /* fix end_log_pos */ - val= uint4korr(log_pos) + group + - (end_log_pos_inc += (do_checksum ? BINLOG_CHECKSUM_LEN : 0)); + val= uint4korr(log_pos) + group + end_log_pos_inc; int4store(log_pos, val); - /* fix CRC */ - if (do_checksum) - { - /* fix length */ - int4store(ev + EVENT_LEN_OFFSET, event_len + BINLOG_CHECKSUM_LEN); - remains= fix_log_event_crc(cache->read_pos, hdr_offs, event_len, - length, &crc); - if (my_b_write(&log_file, ev, - remains == 0 ? event_len : length - hdr_offs)) - DBUG_RETURN(ER_ERROR_ON_WRITE); - if (remains == 0) - { - int4store(buf, crc); - if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN)) - DBUG_RETURN(ER_ERROR_ON_WRITE); - crc= 0; // crc is complete - } - } + /* fix length */ + int4store(ev + EVENT_LEN_OFFSET, ev_len + writer.checksum_len); + + writer.remains= ev_len; + if (writer.write(ev, std::min<uint>(ev_len, length - hdr_offs))) + DBUG_RETURN(ER_ERROR_ON_WRITE); /* next event header at ... */ - hdr_offs += event_len; // incr by the netto len + hdr_offs += ev_len; // incr by the netto len - DBUG_ASSERT(!do_checksum || remains == 0 || hdr_offs >= length); + DBUG_ASSERT(!writer.checksum_len || writer.remains == 0 || hdr_offs >= length); } } @@ -6769,20 +6722,10 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) */ hdr_offs -= length; } - - /* Write data to the binary log file */ - DBUG_EXECUTE_IF("fail_binlog_write_1", - errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE);); - if (!do_checksum) - if (my_b_write(&log_file, cache->read_pos, length)) - DBUG_RETURN(ER_ERROR_ON_WRITE); - status_var_add(thd->status_var.binlog_bytes_written, length); - } while ((length= my_b_fill(cache))); DBUG_ASSERT(carry == 0); - DBUG_ASSERT(!do_checksum || remains == 0); - DBUG_ASSERT(!do_checksum || crc == 0); + DBUG_ASSERT(!writer.checksum_len || writer.remains == 0); DBUG_RETURN(0); // All OK } @@ -6827,7 +6770,7 @@ bool MYSQL_BIN_LOG::write_incident_already_locked(THD *thd) if (likely(is_open())) { - error= ev.write(&log_file); + error= write_event(&ev); status_var_add(thd->status_var.binlog_bytes_written, ev.data_written); } @@ -6889,7 +6832,7 @@ MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name_arg Otherwise a subsequent log purge could delete binlogs that XA recovery thinks are needed (even though they are not really). */ - if (!ev.write(&log_file) && !flush_and_sync(0)) + if (!write_event(&ev) && !flush_and_sync(0)) { signal_update(); } @@ -7797,7 +7740,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, DBUG_RETURN(ER_ERROR_ON_WRITE); }); - if (entry->end_event->write(&log_file)) + if (write_event(entry->end_event)) { entry->error_cache= NULL; DBUG_RETURN(ER_ERROR_ON_WRITE); @@ -7807,7 +7750,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, if (entry->incident_event) { - if (entry->incident_event->write(&log_file)) + if (write_event(entry->incident_event)) { entry->error_cache= NULL; DBUG_RETURN(ER_ERROR_ON_WRITE); @@ -8066,7 +8009,7 @@ void MYSQL_BIN_LOG::close(uint exiting) : (enum_binlog_checksum_alg)binlog_checksum_options; DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); - s.write(&log_file); + write_event(&s); bytes_written+= s.data_written; signal_update(); diff --git a/sql/log.h b/sql/log.h index fcec606443f..31cb4b2f0ae 100644 --- a/sql/log.h +++ b/sql/log.h @@ -737,6 +737,9 @@ public: void stop_union_events(THD *thd); bool is_query_in_union(THD *thd, query_id_t query_id_param); + bool write_event(Log_event *ev, IO_CACHE *file); + bool write_event(Log_event *ev) { return write_event(ev, &log_file); } + /* v stands for vector invoked as appendv(buf1,len1,buf2,len2,...,bufn,lenn,0) diff --git a/sql/log_event.cc b/sql/log_event.cc index 20ce96c2f7b..8617e35953e 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -668,19 +668,6 @@ static void cleanup_load_tmpdir(LEX_STRING *connection_name) /* - write_str() -*/ - -static bool write_str(IO_CACHE *file, const char *str, uint length) -{ - uchar tmp[1]; - tmp[0]= (uchar) length; - return (my_b_safe_write(file, tmp, sizeof(tmp)) || - my_b_safe_write(file, (uchar*) str, length)); -} - - -/* read_str() */ @@ -845,8 +832,7 @@ const char* Log_event::get_type_str() #ifndef MYSQL_CLIENT Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) - :log_pos(0), temp_buf(0), exec_time(0), - crc(0), thd(thd_arg), + :log_pos(0), temp_buf(0), exec_time(0), thd(thd_arg), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) { server_id= thd->variables.server_id; @@ -870,8 +856,7 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) */ Log_event::Log_event() - :temp_buf(0), exec_time(0), flags(0), - cache_type(Log_event::EVENT_INVALID_CACHE), crc(0), + :temp_buf(0), exec_time(0), flags(0), cache_type(EVENT_INVALID_CACHE), thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) { server_id= global_system_variables.server_id; @@ -893,7 +878,7 @@ Log_event::Log_event() Log_event::Log_event(const char* buf, const Format_description_log_event* description_event) :temp_buf(0), exec_time(0), cache_type(Log_event::EVENT_INVALID_CACHE), - crc(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) + checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) { #ifndef MYSQL_CLIENT thd = 0; @@ -1159,26 +1144,51 @@ my_bool Log_event::need_checksum() DBUG_RETURN(ret); } -bool Log_event::wrapper_my_b_safe_write(IO_CACHE* file, const uchar* buf, ulong size) +int Log_event_writer::write_internal(const uchar *pos, size_t len) { - if (need_checksum() && size != 0) - crc= my_checksum(crc, buf, size); - - return my_b_safe_write(file, buf, size); + if (my_b_safe_write(file, pos, len)) + return 1; + bytes_written+= len; + return 0; } -bool Log_event::write_footer(IO_CACHE* file) +int Log_event_writer::write_header(uchar *pos, size_t len) { DBUG_ENTER("write_footer"); /* - (optional) footer contains the checksum value + recording checksum of FD event computed with dropped + possibly active LOG_EVENT_BINLOG_IN_USE_F flag. + Similar step at verication: the active flag is dropped before + checksum computing. */ - if (need_checksum()) + if (checksum_len) + { + uchar save=pos[FLAGS_OFFSET]; + pos[FLAGS_OFFSET]&= ~LOG_EVENT_BINLOG_IN_USE_F; + crc= my_checksum(0, pos, len); + pos[FLAGS_OFFSET]= save; + } + + return write_internal(pos, len); +} + +int Log_event_writer::write_data(const uchar *pos, size_t len) +{ + if (checksum_len) + crc= my_checksum(crc, pos, len); + + return write_internal(pos, len); +} + +int Log_event_writer::write_footer() +{ + DBUG_ENTER("Log_event_writer::write_footer"); + if (checksum_len) { - DBUG_PRINT("info", ("Writing checksum")); - uchar buf[BINLOG_CHECKSUM_LEN]; - int4store(buf, crc); - DBUG_RETURN(my_b_safe_write(file, (uchar*) buf, sizeof(buf))); + uchar checksum_buf[BINLOG_CHECKSUM_LEN]; + int4store(checksum_buf, crc); + if (write_internal(checksum_buf, BINLOG_CHECKSUM_LEN)) + DBUG_RETURN(ER_ERROR_ON_WRITE); } DBUG_RETURN(0); } @@ -1187,24 +1197,19 @@ bool Log_event::write_footer(IO_CACHE* file) Log_event::write_header() */ -bool Log_event::write_header(IO_CACHE* file, ulong event_data_length) +bool Log_event::write_header(ulong event_data_length) { uchar header[LOG_EVENT_HEADER_LEN]; ulong now; - bool ret; DBUG_ENTER("Log_event::write_header"); DBUG_PRINT("enter", ("filepos: %lld length: %lu type: %d", - (longlong) my_b_tell(file), event_data_length, + (longlong) writer->pos(), event_data_length, (int) get_type_code())); - /* Store number of bytes that will be written by this event */ - data_written= event_data_length + sizeof(header); + writer->checksum_len= need_checksum() ? BINLOG_CHECKSUM_LEN : 0; - if (need_checksum()) - { - crc= 0; - data_written += BINLOG_CHECKSUM_LEN; - } + /* Store number of bytes that will be written by this event */ + data_written= event_data_length + sizeof(header) + writer->checksum_len; /* log_pos != 0 if this is relay-log event. In this case we should not @@ -1226,7 +1231,7 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length) (end of this event, that is). */ - log_pos= my_b_safe_tell(file)+data_written; + log_pos= writer->pos() + data_written; } now= get_time(); // Query start time @@ -1243,36 +1248,10 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length) int4store(header+ SERVER_ID_OFFSET, server_id); int4store(header+ EVENT_LEN_OFFSET, data_written); int4store(header+ LOG_POS_OFFSET, log_pos); - /* - recording checksum of FD event computed with dropped - possibly active LOG_EVENT_BINLOG_IN_USE_F flag. - Similar step at verication: the active flag is dropped before - checksum computing. - */ - if (header[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT || - !need_checksum() || !(flags & LOG_EVENT_BINLOG_IN_USE_F)) - { - int2store(header+ FLAGS_OFFSET, flags); - ret= wrapper_my_b_safe_write(file, header, sizeof(header)) != 0; - } - else - { - ret= (wrapper_my_b_safe_write(file, header, FLAGS_OFFSET) != 0); - if (!ret) - { - flags &= ~LOG_EVENT_BINLOG_IN_USE_F; - int2store(header + FLAGS_OFFSET, flags); - crc= my_checksum(crc, header + FLAGS_OFFSET, sizeof(flags)); - flags |= LOG_EVENT_BINLOG_IN_USE_F; - int2store(header + FLAGS_OFFSET, flags); - ret= (my_b_safe_write(file, header + FLAGS_OFFSET, sizeof(flags)) != 0); - } - if (!ret) - ret= (wrapper_my_b_safe_write(file, header + FLAGS_OFFSET + sizeof(flags), - sizeof(header) - - (FLAGS_OFFSET + sizeof(flags))) != 0); - } - DBUG_RETURN( ret); + int2store(header + FLAGS_OFFSET, flags); + + bool ret= writer->write_header(header, sizeof(header)); + DBUG_RETURN(ret); } #endif /* !MYSQL_CLIENT */ @@ -1664,9 +1643,11 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, if (ev) { ev->checksum_alg= alg; +#ifdef MYSQL_CLIENT if (ev->checksum_alg != BINLOG_CHECKSUM_ALG_OFF && ev->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ev->crc= uint4korr(buf + (event_len)); +#endif } DBUG_PRINT("read_event", ("%s(type_code: %u; event_len: %u)", @@ -2749,7 +2730,7 @@ static void store_str_with_code_and_len(uchar **dst, const char *src, will print! */ -bool Query_log_event::write(IO_CACHE* file) +bool Query_log_event::write() { uchar buf[QUERY_HEADER_LEN + MAX_SIZE_LOG_EVENT_STATUS]; uchar *start, *start_of_status; @@ -2978,14 +2959,13 @@ bool Query_log_event::write(IO_CACHE* file) */ event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len; - return (write_header(file, event_length) || - wrapper_my_b_safe_write(file, (uchar*) buf, QUERY_HEADER_LEN) || - write_post_header_for_derived(file) || - wrapper_my_b_safe_write(file, (uchar*) start_of_status, - (uint) (start-start_of_status)) || - wrapper_my_b_safe_write(file, (db) ? (uchar*) db : (uchar*)"", db_len + 1) || - wrapper_my_b_safe_write(file, (uchar*) query, q_len) || - write_footer(file)) ? 1 : 0; + return write_header(event_length) || + write_data(buf, QUERY_HEADER_LEN) || + write_post_header_for_derived() || + write_data(start_of_status, (uint) (start-start_of_status)) || + write_data(safe_str(db), db_len + 1) || + write_data(query, q_len) || + write_footer(); } /** @@ -4610,7 +4590,7 @@ Start_log_event_v3::Start_log_event_v3(const char* buf, uint event_len, */ #ifndef MYSQL_CLIENT -bool Start_log_event_v3::write(IO_CACHE* file) +bool Start_log_event_v3::write() { char buff[START_V3_HEADER_LEN]; int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version); @@ -4618,9 +4598,9 @@ bool Start_log_event_v3::write(IO_CACHE* file) if (!dont_set_created) created= get_time(); // this sets when and when_sec_part as a side effect int4store(buff + ST_CREATED_OFFSET,created); - return (write_header(file, sizeof(buff)) || - wrapper_my_b_safe_write(file, (uchar*) buff, sizeof(buff)) || - write_footer(file)); + return write_header(sizeof(buff)) || + write_data(buff, sizeof(buff)) || + write_footer(); } #endif @@ -4933,7 +4913,7 @@ Format_description_log_event(const char* buf, } #ifndef MYSQL_CLIENT -bool Format_description_log_event::write(IO_CACHE* file) +bool Format_description_log_event::write() { bool ret; bool no_checksum; @@ -4981,12 +4961,11 @@ bool Format_description_log_event::write(IO_CACHE* file) { checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway } - ret= (write_header(file, rec_size) || - wrapper_my_b_safe_write(file, buff, sizeof(buff)) || - wrapper_my_b_safe_write(file, (uchar*)post_header_len, - number_of_event_types) || - wrapper_my_b_safe_write(file, &checksum_byte, sizeof(checksum_byte)) || - write_footer(file)); + ret= write_header(rec_size) || + write_data(buff, sizeof(buff)) || + write_data(post_header_len, number_of_event_types) || + write_data(&checksum_byte, sizeof(checksum_byte)) || + write_footer(); if (no_checksum) checksum_alg= BINLOG_CHECKSUM_ALG_OFF; return ret; @@ -5332,7 +5311,7 @@ void Load_log_event::pack_info(THD *thd, Protocol *protocol) Load_log_event::write_data_header() */ -bool Load_log_event::write_data_header(IO_CACHE* file) +bool Load_log_event::write_data_header() { char buf[LOAD_HEADER_LEN]; int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id); @@ -5341,7 +5320,7 @@ bool Load_log_event::write_data_header(IO_CACHE* file) buf[L_TBL_LEN_OFFSET] = (char)table_name_len; buf[L_DB_LEN_OFFSET] = (char)db_len; int4store(buf + L_NUM_FIELDS_OFFSET, num_fields); - return my_b_safe_write(file, (uchar*)buf, LOAD_HEADER_LEN) != 0; + return write_data(buf, LOAD_HEADER_LEN) != 0; } @@ -5349,19 +5328,19 @@ bool Load_log_event::write_data_header(IO_CACHE* file) Load_log_event::write_data_body() */ -bool Load_log_event::write_data_body(IO_CACHE* file) +bool Load_log_event::write_data_body() { - if (sql_ex.write_data(file)) + if (sql_ex.write_data(writer)) return 1; if (num_fields && fields && field_lens) { - if (my_b_safe_write(file, (uchar*)field_lens, num_fields) || - my_b_safe_write(file, (uchar*)fields, field_block_len)) + if (write_data(field_lens, num_fields) || + write_data(fields, field_block_len)) return 1; } - return (my_b_safe_write(file, (uchar*)table_name, table_name_len + 1) || - my_b_safe_write(file, (uchar*)db, db_len + 1) || - my_b_safe_write(file, (uchar*)fname, fname_len)); + return (write_data(table_name, table_name_len + 1) || + write_data(db, db_len + 1) || + write_data(fname, fname_len)); } @@ -6078,15 +6057,14 @@ Rotate_log_event::Rotate_log_event(const char* buf, uint event_len, */ #ifndef MYSQL_CLIENT -bool Rotate_log_event::write(IO_CACHE* file) +bool Rotate_log_event::write() { char buf[ROTATE_HEADER_LEN]; int8store(buf + R_POS_OFFSET, pos); - return (write_header(file, ROTATE_HEADER_LEN + ident_len) || - wrapper_my_b_safe_write(file, (uchar*) buf, ROTATE_HEADER_LEN) || - wrapper_my_b_safe_write(file, (uchar*) new_log_ident, - (uint) ident_len) || - write_footer(file)); + return (write_header(ROTATE_HEADER_LEN + ident_len) || + write_data(buf, ROTATE_HEADER_LEN) || + write_data(new_log_ident, (uint) ident_len) || + write_footer()); } #endif @@ -6276,15 +6254,14 @@ Binlog_checkpoint_log_event::Binlog_checkpoint_log_event( #ifndef MYSQL_CLIENT -bool Binlog_checkpoint_log_event::write(IO_CACHE *file) +bool Binlog_checkpoint_log_event::write() { uchar buf[BINLOG_CHECKPOINT_HEADER_LEN]; int4store(buf, binlog_file_len); - return write_header(file, BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) || - wrapper_my_b_safe_write(file, buf, BINLOG_CHECKPOINT_HEADER_LEN) || - wrapper_my_b_safe_write(file, (const uchar *)binlog_file_name, - binlog_file_len) || - write_footer(file); + return write_header(BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) || + write_data(buf, BINLOG_CHECKPOINT_HEADER_LEN) || + write_data(binlog_file_name, binlog_file_len) || + write_footer(); } #endif /* MYSQL_CLIENT */ @@ -6386,7 +6363,7 @@ Gtid_log_event::peek(const char *event_start, size_t event_len, bool -Gtid_log_event::write(IO_CACHE *file) +Gtid_log_event::write() { uchar buf[GTID_HEADER_LEN+2]; size_t write_len; @@ -6404,9 +6381,9 @@ Gtid_log_event::write(IO_CACHE *file) bzero(buf+13, GTID_HEADER_LEN-13); write_len= GTID_HEADER_LEN; } - return write_header(file, write_len) || - wrapper_my_b_safe_write(file, buf, write_len) || - write_footer(file); + return write_header(write_len) || + write_data(buf, write_len) || + write_footer(); } @@ -6764,7 +6741,7 @@ Gtid_list_log_event::to_packet(String *packet) bool -Gtid_list_log_event::write(IO_CACHE *file) +Gtid_list_log_event::write() { char buf[128]; String packet(buf, sizeof(buf), system_charset_info); @@ -6772,10 +6749,9 @@ Gtid_list_log_event::write(IO_CACHE *file) packet.length(0); if (to_packet(&packet)) return true; - return - write_header(file, get_data_size()) || - wrapper_my_b_safe_write(file, (uchar *)packet.ptr(), packet.length()) || - write_footer(file); + return write_header(get_data_size()) || + write_data(packet.ptr(), packet.length()) || + write_footer(); } @@ -6980,14 +6956,14 @@ const char* Intvar_log_event::get_var_type_name() */ #ifndef MYSQL_CLIENT -bool Intvar_log_event::write(IO_CACHE* file) +bool Intvar_log_event::write() { uchar buf[9]; buf[I_TYPE_OFFSET]= (uchar) type; int8store(buf + I_VAL_OFFSET, val); - return (write_header(file, sizeof(buf)) || - wrapper_my_b_safe_write(file, buf, sizeof(buf)) || - write_footer(file)); + return write_header(sizeof(buf)) || + write_data(buf, sizeof(buf)) || + write_footer(); } #endif @@ -7110,14 +7086,14 @@ Rand_log_event::Rand_log_event(const char* buf, #ifndef MYSQL_CLIENT -bool Rand_log_event::write(IO_CACHE* file) +bool Rand_log_event::write() { uchar buf[16]; int8store(buf + RAND_SEED1_OFFSET, seed1); int8store(buf + RAND_SEED2_OFFSET, seed2); - return (write_header(file, sizeof(buf)) || - wrapper_my_b_safe_write(file, buf, sizeof(buf)) || - write_footer(file)); + return write_header(sizeof(buf)) || + write_data(buf, sizeof(buf)) || + write_footer(); } #endif @@ -7236,12 +7212,12 @@ Xid_log_event(const char* buf, #ifndef MYSQL_CLIENT -bool Xid_log_event::write(IO_CACHE* file) +bool Xid_log_event::write() { DBUG_EXECUTE_IF("do_not_write_xid", return 0;); - return (write_header(file, sizeof(xid)) || - wrapper_my_b_safe_write(file, (uchar*) &xid, sizeof(xid)) || - write_footer(file)); + return write_header(sizeof(xid)) || + write_data((uchar*)&xid, sizeof(xid)) || + write_footer(); } #endif @@ -7587,7 +7563,7 @@ err: #ifndef MYSQL_CLIENT -bool User_var_log_event::write(IO_CACHE* file) +bool User_var_log_event::write() { char buf[UV_NAME_LEN_SIZE]; char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + @@ -7642,13 +7618,13 @@ bool User_var_log_event::write(IO_CACHE* file) /* Length of the whole event */ event_length= sizeof(buf)+ name_len + buf1_length + val_len + unsigned_len; - return (write_header(file, event_length) || - wrapper_my_b_safe_write(file, (uchar*) buf, sizeof(buf)) || - wrapper_my_b_safe_write(file, (uchar*) name, name_len) || - wrapper_my_b_safe_write(file, (uchar*) buf1, buf1_length) || - wrapper_my_b_safe_write(file, pos, val_len) || - wrapper_my_b_safe_write(file, &flags, unsigned_len) || - write_footer(file)); + return write_header(event_length) || + write_data(buf, sizeof(buf)) || + write_data(name, name_len) || + write_data(buf1, buf1_length) || + write_data(pos, val_len) || + write_data(&flags, unsigned_len) || + write_footer(); } #endif @@ -7986,13 +7962,13 @@ Create_file_log_event(THD* thd_arg, sql_exchange* ex, Create_file_log_event::write_data_body() */ -bool Create_file_log_event::write_data_body(IO_CACHE* file) +bool Create_file_log_event::write_data_body() { bool res; - if ((res= Load_log_event::write_data_body(file)) || fake_base) + if ((res= Load_log_event::write_data_body()) || fake_base) return res; - return (my_b_safe_write(file, (uchar*) "", 1) || - my_b_safe_write(file, (uchar*) block, block_len)); + return write_data("", 1) || + write_data(block, block_len); } @@ -8000,14 +7976,14 @@ bool Create_file_log_event::write_data_body(IO_CACHE* file) Create_file_log_event::write_data_header() */ -bool Create_file_log_event::write_data_header(IO_CACHE* file) +bool Create_file_log_event::write_data_header() { bool res; uchar buf[CREATE_FILE_HEADER_LEN]; - if ((res= Load_log_event::write_data_header(file)) || fake_base) + if ((res= Load_log_event::write_data_header()) || fake_base) return res; int4store(buf + CF_FILE_ID_OFFSET, file_id); - return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0; + return write_data(buf, CREATE_FILE_HEADER_LEN) != 0; } @@ -8015,11 +7991,11 @@ bool Create_file_log_event::write_data_header(IO_CACHE* file) Create_file_log_event::write_base() */ -bool Create_file_log_event::write_base(IO_CACHE* file) +bool Create_file_log_event::write_base() { bool res; fake_base= 1; // pretend we are Load event - res= write(file); + res= write(); fake_base= 0; return res; } @@ -8166,6 +8142,7 @@ int Create_file_log_event::do_apply_event(rpl_group_info *rgi) char *ext; int fd = -1; IO_CACHE file; + Log_event_writer lew(&file); int error = 1; Relay_log_info const *rli= rgi->rli; @@ -8191,7 +8168,8 @@ int Create_file_log_event::do_apply_event(rpl_group_info *rgi) // a trick to avoid allocating another buffer fname= fname_buf; fname_len= (uint) (strmov(ext, ".data") - fname); - if (write_base(&file)) + writer= &lew; + if (write_base()) { strmov(ext, ".info"); // to have it right in the error message rli->report(ERROR_LEVEL, my_errno, rgi->gtid_info(), @@ -8282,14 +8260,14 @@ Append_block_log_event::Append_block_log_event(const char* buf, uint len, */ #ifndef MYSQL_CLIENT -bool Append_block_log_event::write(IO_CACHE* file) +bool Append_block_log_event::write() { uchar buf[APPEND_BLOCK_HEADER_LEN]; int4store(buf + AB_FILE_ID_OFFSET, file_id); - return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) || - wrapper_my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) || - wrapper_my_b_safe_write(file, (uchar*) block, block_len) || - write_footer(file)); + return write_header(APPEND_BLOCK_HEADER_LEN + block_len) || + write_data(buf, APPEND_BLOCK_HEADER_LEN) || + write_data(block, block_len) || + write_footer(); } #endif @@ -8442,13 +8420,13 @@ Delete_file_log_event::Delete_file_log_event(const char* buf, uint len, */ #ifndef MYSQL_CLIENT -bool Delete_file_log_event::write(IO_CACHE* file) +bool Delete_file_log_event::write() { uchar buf[DELETE_FILE_HEADER_LEN]; int4store(buf + DF_FILE_ID_OFFSET, file_id); - return (write_header(file, sizeof(buf)) || - wrapper_my_b_safe_write(file, buf, sizeof(buf)) || - write_footer(file)); + return write_header(sizeof(buf)) || + write_data(buf, sizeof(buf)) || + write_footer(); } #endif @@ -8542,13 +8520,13 @@ Execute_load_log_event::Execute_load_log_event(const char* buf, uint len, */ #ifndef MYSQL_CLIENT -bool Execute_load_log_event::write(IO_CACHE* file) +bool Execute_load_log_event::write() { uchar buf[EXEC_LOAD_HEADER_LEN]; int4store(buf + EL_FILE_ID_OFFSET, file_id); - return (write_header(file, sizeof(buf)) || - wrapper_my_b_safe_write(file, buf, sizeof(buf)) || - write_footer(file)); + return write_header(sizeof(buf)) || + write_data(buf, sizeof(buf)) || + write_footer(); } #endif @@ -8777,14 +8755,14 @@ ulong Execute_load_query_log_event::get_post_header_size_for_derived() #ifndef MYSQL_CLIENT bool -Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file) +Execute_load_query_log_event::write_post_header_for_derived() { uchar buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN]; int4store(buf, file_id); int4store(buf + 4, fn_pos_start); int4store(buf + 4 + 4, fn_pos_end); *(buf + 4 + 4 + 4)= (uchar) dup_handling; - return wrapper_my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN); + return write_data(buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN); } #endif @@ -8928,40 +8906,6 @@ Execute_load_query_log_event::do_apply_event(rpl_group_info *rgi) **************************************************************************/ /* - sql_ex_info::write_data() -*/ - -bool sql_ex_info::write_data(IO_CACHE* file) -{ - if (new_format()) - { - return (write_str(file, field_term, (uint) field_term_len) || - write_str(file, enclosed, (uint) enclosed_len) || - write_str(file, line_term, (uint) line_term_len) || - write_str(file, line_start, (uint) line_start_len) || - write_str(file, escaped, (uint) escaped_len) || - my_b_safe_write(file,(uchar*) &opt_flags,1)); - } - else - { - /** - @todo This is sensitive to field padding. We should write a - char[7], not an old_sql_ex. /sven - */ - old_sql_ex old_ex; - old_ex.field_term= *field_term; - old_ex.enclosed= *enclosed; - old_ex.line_term= *line_term; - old_ex.line_start= *line_start; - old_ex.escaped= *escaped; - old_ex.opt_flags= opt_flags; - old_ex.empty_flags=empty_flags; - return my_b_safe_write(file, (uchar*) &old_ex, sizeof(old_ex)) != 0; - } -} - - -/* sql_ex_info::init() */ @@ -9011,12 +8955,54 @@ const char *sql_ex_info::init(const char *buf, const char *buf_end, return buf; } +#ifndef MYSQL_CLIENT +/* + write_str() +*/ + +static bool write_str(Log_event_writer *writer, const char *str, uint length) +{ + uchar tmp[1]; + tmp[0]= (uchar) length; + return (writer->write_data(tmp, sizeof(tmp)) || + writer->write_data((uchar*) str, length)); +} + +/* + sql_ex_info::write_data() +*/ + +bool sql_ex_info::write_data(Log_event_writer *writer) +{ + if (new_format()) + { + return write_str(writer, field_term, field_term_len) || + write_str(writer, enclosed, enclosed_len) || + write_str(writer, line_term, line_term_len) || + write_str(writer, line_start, line_start_len) || + write_str(writer, escaped, escaped_len) || + writer->write_data((uchar*) &opt_flags, 1); + } + else + { + uchar old_ex[7]; + old_ex[0]= *field_term; + old_ex[1]= *enclosed; + old_ex[2]= *line_term; + old_ex[3]= *line_start; + old_ex[4]= *escaped; + old_ex[5]= opt_flags; + old_ex[6]= empty_flags; + return writer->write_data(old_ex, sizeof(old_ex)); + } +} + + /************************************************************************** Rows_log_event member functions **************************************************************************/ -#ifndef MYSQL_CLIENT Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, MY_BITMAP const *cols, bool is_transactional, Log_event_type event_type) @@ -9965,7 +9951,7 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi) #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ #ifndef MYSQL_CLIENT -bool Rows_log_event::write_data_header(IO_CACHE *file) +bool Rows_log_event::write_data_header() { uchar buf[ROWS_HEADER_LEN_V2]; // No need to init the buffer DBUG_ASSERT(m_table_id != ~0UL); @@ -9973,14 +9959,14 @@ bool Rows_log_event::write_data_header(IO_CACHE *file) { int4store(buf + 0, m_table_id); int2store(buf + 4, m_flags); - return (wrapper_my_b_safe_write(file, buf, 6)); + return (write_data(buf, 6)); }); int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id); int2store(buf + RW_FLAGS_OFFSET, m_flags); - return (wrapper_my_b_safe_write(file, buf, ROWS_HEADER_LEN)); + return write_data(buf, ROWS_HEADER_LEN); } -bool Rows_log_event::write_data_body(IO_CACHE*file) +bool Rows_log_event::write_data_body() { /* Note that this should be the number of *bits*, not the number of @@ -9993,11 +9979,10 @@ bool Rows_log_event::write_data_body(IO_CACHE*file) DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf)); DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf)); - res= res || wrapper_my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf)); + res= res || write_data(sbuf, (size_t) (sbuf_end - sbuf)); DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols)); - res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols.bitmap, - no_bytes_in_map(&m_cols)); + res= res || write_data((uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols)); /* TODO[refactor write]: Remove the "down cast" here (and elsewhere). */ @@ -10005,11 +9990,11 @@ bool Rows_log_event::write_data_body(IO_CACHE*file) { DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap, no_bytes_in_map(&m_cols_ai)); - res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols_ai.bitmap, - no_bytes_in_map(&m_cols_ai)); + res= res || write_data((uchar*)m_cols_ai.bitmap, + no_bytes_in_map(&m_cols_ai)); } DBUG_DUMP("rows", m_rows_buf, data_size); - res= res || wrapper_my_b_safe_write(file, m_rows_buf, (size_t) data_size); + res= res || write_data(m_rows_buf, (size_t) data_size); return res; @@ -10107,16 +10092,16 @@ bool Annotate_rows_log_event::is_valid() const } #ifndef MYSQL_CLIENT -bool Annotate_rows_log_event::write_data_header(IO_CACHE *file) +bool Annotate_rows_log_event::write_data_header() { return 0; } #endif #ifndef MYSQL_CLIENT -bool Annotate_rows_log_event::write_data_body(IO_CACHE *file) +bool Annotate_rows_log_event::write_data_body() { - return wrapper_my_b_safe_write(file, (uchar*) m_query_txt, m_query_len); + return write_data(m_query_txt, m_query_len); } #endif @@ -10826,7 +10811,7 @@ int Table_map_log_event::do_update_pos(rpl_group_info *rgi) #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ #ifndef MYSQL_CLIENT -bool Table_map_log_event::write_data_header(IO_CACHE *file) +bool Table_map_log_event::write_data_header() { DBUG_ASSERT(m_table_id != ~0UL); uchar buf[TABLE_MAP_HEADER_LEN]; @@ -10834,14 +10819,14 @@ bool Table_map_log_event::write_data_header(IO_CACHE *file) { int4store(buf + 0, m_table_id); int2store(buf + 4, m_flags); - return (wrapper_my_b_safe_write(file, buf, 6)); + return (write_data(buf, 6)); }); int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id); int2store(buf + TM_FLAGS_OFFSET, m_flags); - return (wrapper_my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN)); + return write_data(buf, TABLE_MAP_HEADER_LEN); } -bool Table_map_log_event::write_data_body(IO_CACHE *file) +bool Table_map_log_event::write_data_body() { DBUG_ASSERT(m_dbnam != NULL); DBUG_ASSERT(m_tblnam != NULL); @@ -10862,15 +10847,15 @@ bool Table_map_log_event::write_data_body(IO_CACHE *file) uchar mbuf[MAX_INT_WIDTH]; uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size); - return (wrapper_my_b_safe_write(file, dbuf, sizeof(dbuf)) || - wrapper_my_b_safe_write(file, (const uchar*)m_dbnam, m_dblen+1) || - wrapper_my_b_safe_write(file, tbuf, sizeof(tbuf)) || - wrapper_my_b_safe_write(file, (const uchar*)m_tblnam, m_tbllen+1) || - wrapper_my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) || - wrapper_my_b_safe_write(file, m_coltype, m_colcnt) || - wrapper_my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) || - wrapper_my_b_safe_write(file, m_field_metadata, m_field_metadata_size), - wrapper_my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8)); + return write_data(dbuf, sizeof(dbuf)) || + write_data(m_dbnam, m_dblen+1) || + write_data(tbuf, sizeof(tbuf)) || + write_data(m_tblnam, m_tbllen+1) || + write_data(cbuf, (size_t) (cbuf_end - cbuf)) || + write_data(m_coltype, m_colcnt) || + write_data(mbuf, (size_t) (mbuf_end - mbuf)) || + write_data(m_field_metadata, m_field_metadata_size), + write_data(m_null_bits, (m_colcnt + 7) / 8); } #endif @@ -12432,35 +12417,27 @@ Incident_log_event::do_apply_event(rpl_group_info *rgi) } #endif +#ifdef MYSQL_SERVER bool -Incident_log_event::write_data_header(IO_CACHE *file) +Incident_log_event::write_data_header() { DBUG_ENTER("Incident_log_event::write_data_header"); DBUG_PRINT("enter", ("m_incident: %d", m_incident)); uchar buf[sizeof(int16)]; int2store(buf, (int16) m_incident); -#ifndef MYSQL_CLIENT - DBUG_RETURN(wrapper_my_b_safe_write(file, buf, sizeof(buf))); -#else - DBUG_RETURN(my_b_safe_write(file, buf, sizeof(buf))); -#endif + DBUG_RETURN(write_data(buf, sizeof(buf))); } bool -Incident_log_event::write_data_body(IO_CACHE *file) +Incident_log_event::write_data_body() { uchar tmp[1]; DBUG_ENTER("Incident_log_event::write_data_body"); tmp[0]= (uchar) m_message.length; - crc= my_checksum(crc, (uchar*) tmp, 1); - if (m_message.length > 0) - { - crc= my_checksum(crc, (uchar*) m_message.str, m_message.length); - // todo: report a bug on write_str accepts uint but treats it as uchar - } - DBUG_RETURN(write_str(file, m_message.str, (uint) m_message.length)); + DBUG_RETURN(write_data(tmp, sizeof(tmp)) || + write_data(m_message.str, m_message.length)); } - +#endif #ifdef MYSQL_CLIENT /** diff --git a/sql/log_event.h b/sql/log_event.h index 4c2dca14f3b..77b3d1f5216 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -145,64 +145,10 @@ class String; #define LINE_START_EMPTY 0x8 #define ESCAPED_EMPTY 0x10 -/***************************************************************************** - - old_sql_ex struct - - ****************************************************************************/ -struct old_sql_ex -{ - char field_term; - char enclosed; - char line_term; - char line_start; - char escaped; - char opt_flags; - char empty_flags; -}; - #define NUM_LOAD_DELIM_STRS 5 /***************************************************************************** - sql_ex_info struct - - ****************************************************************************/ -struct sql_ex_info -{ - sql_ex_info() {} /* Remove gcc warning */ - const char* field_term; - const char* enclosed; - const char* line_term; - const char* line_start; - const char* escaped; - int cached_new_format; - uint8 field_term_len,enclosed_len,line_term_len,line_start_len, escaped_len; - char opt_flags; - char empty_flags; - - // store in new format even if old is possible - void force_new_format() { cached_new_format = 1;} - int data_size() - { - return (new_format() ? - field_term_len + enclosed_len + line_term_len + - line_start_len + escaped_len + 6 : 7); - } - bool write_data(IO_CACHE* file); - const char* init(const char* buf, const char* buf_end, bool use_new_format); - bool new_format() - { - return ((cached_new_format != -1) ? cached_new_format : - (cached_new_format=(field_term_len > 1 || - enclosed_len > 1 || - line_term_len > 1 || line_start_len > 1 || - escaped_len > 1))); - } -}; - -/***************************************************************************** - MySQL Binary Log This log consists of events. Each event has a fixed-length header, @@ -843,6 +789,33 @@ typedef struct st_print_event_info #endif /** + This class encapsulates writing of Log_event objects to IO_CACHE. + Automatically calculates the checksum if necessary. +*/ +class Log_event_writer +{ +public: + ulonglong bytes_written; + uint checksum_len; + int write(Log_event *ev); + int write_header(uchar *pos, size_t len); + int write_data(const uchar *pos, size_t len); + int write_footer(); + my_off_t pos() { return my_b_safe_tell(file); } + +Log_event_writer(IO_CACHE *file_arg) : bytes_written(0), file(file_arg) { } + +private: + IO_CACHE *file; + /** + Placeholder for event checksum while writing to binlog. + */ + ha_checksum crc; + + int write_internal(const uchar *pos, size_t len); +}; + +/** the struct aggregates two paramenters that identify an event uniquely in scope of communication of a particular master and slave couple. I.e there can not be 2 events from the same staying connected master which @@ -1108,10 +1081,7 @@ public: */ ulong slave_exec_mode; - /** - Placeholder for event checksum while writing to binlog. - */ - ha_checksum crc; + Log_event_writer *writer; #ifdef MYSQL_SERVER THD* thd; @@ -1143,6 +1113,7 @@ public: } #else Log_event() : temp_buf(0), flags(0) {} + ha_checksum crc; /* print*() functions are used by mysqlbinlog */ virtual void print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0; void print_timestamp(IO_CACHE* file, time_t *ts = 0); @@ -1216,23 +1187,26 @@ public: /* Placement version of the above operators */ static void *operator new(size_t, void* ptr) { return ptr; } static void operator delete(void*, void*) { } - bool wrapper_my_b_safe_write(IO_CACHE* file, const uchar* buf, ulong data_length); #ifdef MYSQL_SERVER - bool write_header(IO_CACHE* file, ulong data_length); - bool write_footer(IO_CACHE* file); + bool write_header(ulong data_length); + bool write_data(const uchar *buf, ulong data_length) + { return writer->write_data(buf, data_length); } + bool write_data(const char *buf, ulong data_length) + { return write_data((uchar*)buf, data_length); } + bool write_footer() + { return writer->write_footer(); } + my_bool need_checksum(); - virtual bool write(IO_CACHE* file) + virtual bool write() { - return(write_header(file, get_data_size()) || - write_data_header(file) || - write_data_body(file) || - write_footer(file)); + return write_header(get_data_size()) || write_data_header() || + write_data_body() || write_footer(); } - virtual bool write_data_header(IO_CACHE* file __attribute__((unused))) + virtual bool write_data_header() { return 0; } - virtual bool write_data_body(IO_CACHE* file __attribute__((unused))) + virtual bool write_data_body() { return 0; } /* Return start of query time or current time */ @@ -1989,8 +1963,8 @@ public: static int dummy_event(String *packet, ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg); static int begin_event(String *packet, ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg); #ifdef MYSQL_SERVER - bool write(IO_CACHE* file); - virtual bool write_post_header_for_derived(IO_CACHE* file) { return FALSE; } + bool write(); + virtual bool write_post_header_for_derived() { return FALSE; } #endif bool is_valid() const { return query != 0; } @@ -2040,6 +2014,42 @@ public: /* !!! Public in this patch to allow old usage */ }; +/***************************************************************************** + sql_ex_info struct + ****************************************************************************/ +struct sql_ex_info +{ + sql_ex_info() {} /* Remove gcc warning */ + const char* field_term; + const char* enclosed; + const char* line_term; + const char* line_start; + const char* escaped; + int cached_new_format; + uint8 field_term_len,enclosed_len,line_term_len,line_start_len, escaped_len; + char opt_flags; + char empty_flags; + + // store in new format even if old is possible + void force_new_format() { cached_new_format = 1;} + int data_size() + { + return (new_format() ? + field_term_len + enclosed_len + line_term_len + + line_start_len + escaped_len + 6 : 7); + } + bool write_data(Log_event_writer *writer); + const char* init(const char* buf, const char* buf_end, bool use_new_format); + bool new_format() + { + return ((cached_new_format != -1) ? cached_new_format : + (cached_new_format=(field_term_len > 1 || + enclosed_len > 1 || + line_term_len > 1 || line_start_len > 1 || + escaped_len > 1))); + } +}; + /** @class Load_log_event @@ -2333,8 +2343,8 @@ public: return sql_ex.new_format() ? NEW_LOAD_EVENT: LOAD_EVENT; } #ifdef MYSQL_SERVER - bool write_data_header(IO_CACHE* file); - bool write_data_body(IO_CACHE* file); + bool write_data_header(); + bool write_data_body(); #endif bool is_valid() const { return table_name != 0; } int get_data_size() @@ -2422,7 +2432,7 @@ public: my_off_t get_header_len(my_off_t l __attribute__((unused))) { return LOG_EVENT_MINIMAL_HEADER_LEN; } #ifdef MYSQL_SERVER - bool write(IO_CACHE* file); + bool write(); #endif bool is_valid() const { return server_version[0] != 0; } int get_data_size() @@ -2492,7 +2502,7 @@ public: } Log_event_type get_type_code() { return FORMAT_DESCRIPTION_EVENT;} #ifdef MYSQL_SERVER - bool write(IO_CACHE* file); + bool write(); #endif bool header_is_valid() const { @@ -2601,7 +2611,7 @@ Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg, const char* get_var_type_name(); int get_data_size() { return 9; /* sizeof(type) + sizeof(val) */;} #ifdef MYSQL_SERVER - bool write(IO_CACHE* file); + bool write(); #endif bool is_valid() const { return 1; } bool is_part_of_group() { return 1; } @@ -2681,7 +2691,7 @@ class Rand_log_event: public Log_event Log_event_type get_type_code() { return RAND_EVENT;} int get_data_size() { return 16; /* sizeof(ulonglong) * 2*/ } #ifdef MYSQL_SERVER - bool write(IO_CACHE* file); + bool write(); #endif bool is_valid() const { return 1; } bool is_part_of_group() { return 1; } @@ -2731,7 +2741,7 @@ class Xid_log_event: public Log_event Log_event_type get_type_code() { return XID_EVENT;} int get_data_size() { return sizeof(xid); } #ifdef MYSQL_SERVER - bool write(IO_CACHE* file); + bool write(); #endif bool is_valid() const { return 1; } @@ -2792,7 +2802,7 @@ public: ~User_var_log_event() {} Log_event_type get_type_code() { return USER_VAR_EVENT;} #ifdef MYSQL_SERVER - bool write(IO_CACHE* file); + bool write(); /* Getter and setter for deferred User-event. Returns true if the event is not applied directly @@ -2944,7 +2954,7 @@ public: int get_data_size() { return ident_len + ROTATE_HEADER_LEN;} bool is_valid() const { return new_log_ident != 0; } #ifdef MYSQL_SERVER - bool write(IO_CACHE* file); + bool write(); #endif private: @@ -2977,7 +2987,7 @@ public: int get_data_size() { return binlog_file_len + BINLOG_CHECKPOINT_HEADER_LEN;} bool is_valid() const { return binlog_file_name != 0; } #ifdef MYSQL_SERVER - bool write(IO_CACHE* file); + bool write(); enum_skip_reason do_shall_skip(rpl_group_info *rgi); #endif }; @@ -3105,7 +3115,7 @@ public: } bool is_valid() const { return seq_no != 0; } #ifdef MYSQL_SERVER - bool write(IO_CACHE *file); + bool write(); static int make_compatible_event(String *packet, bool *need_dummy_event, ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg); static bool peek(const char *event_start, size_t event_len, @@ -3220,7 +3230,7 @@ public: bool is_valid() const { return list != NULL; } #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) bool to_packet(String *packet); - bool write(IO_CACHE *file); + bool write(); virtual int do_apply_event(rpl_group_info *rgi); enum_skip_reason do_shall_skip(rpl_group_info *rgi); #endif @@ -3291,13 +3301,13 @@ public: } bool is_valid() const { return inited_from_old || block != 0; } #ifdef MYSQL_SERVER - bool write_data_header(IO_CACHE* file); - bool write_data_body(IO_CACHE* file); + bool write_data_header(); + bool write_data_body(); /* Cut out Create_file extentions and write it as Load event - used on the slave */ - bool write_base(IO_CACHE* file); + bool write_base(); #endif private: @@ -3351,7 +3361,7 @@ public: int get_data_size() { return block_len + APPEND_BLOCK_HEADER_LEN ;} bool is_valid() const { return block != 0; } #ifdef MYSQL_SERVER - bool write(IO_CACHE* file); + bool write(); const char* get_db() { return db; } #endif @@ -3392,7 +3402,7 @@ public: int get_data_size() { return DELETE_FILE_HEADER_LEN ;} bool is_valid() const { return file_id != 0; } #ifdef MYSQL_SERVER - bool write(IO_CACHE* file); + bool write(); const char* get_db() { return db; } #endif @@ -3432,7 +3442,7 @@ public: int get_data_size() { return EXEC_LOAD_HEADER_LEN ;} bool is_valid() const { return file_id != 0; } #ifdef MYSQL_SERVER - bool write(IO_CACHE* file); + bool write(); const char* get_db() { return db; } #endif @@ -3532,7 +3542,7 @@ public: ulong get_post_header_size_for_derived(); #ifdef MYSQL_SERVER - bool write_post_header_for_derived(IO_CACHE* file); + bool write_post_header_for_derived(); #endif private: @@ -3596,8 +3606,8 @@ public: virtual bool is_part_of_group() { return 1; } #ifndef MYSQL_CLIENT - virtual bool write_data_header(IO_CACHE*); - virtual bool write_data_body(IO_CACHE*); + virtual bool write_data_header(); + virtual bool write_data_body(); #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) @@ -4012,8 +4022,8 @@ public: virtual int get_data_size() { return (uint) m_data_size; } #ifdef MYSQL_SERVER virtual int save_field_metadata(); - virtual bool write_data_header(IO_CACHE *file); - virtual bool write_data_body(IO_CACHE *file); + virtual bool write_data_header(); + virtual bool write_data_body(); virtual const char *get_db() { return m_dbnam; } #endif @@ -4211,8 +4221,8 @@ public: #endif #ifdef MYSQL_SERVER - virtual bool write_data_header(IO_CACHE *file); - virtual bool write_data_body(IO_CACHE *file); + virtual bool write_data_header(); + virtual bool write_data_body(); virtual const char *get_db() { return m_table->s->db.str; } #endif /* @@ -4674,6 +4684,9 @@ public: #ifdef MYSQL_SERVER void pack_info(THD *thd, Protocol*); + + virtual bool write_data_header(); + virtual bool write_data_body(); #endif Incident_log_event(const char *buf, uint event_len, @@ -4689,9 +4702,6 @@ public: virtual int do_apply_event(rpl_group_info *rgi); #endif - virtual bool write_data_header(IO_CACHE *file); - virtual bool write_data_body(IO_CACHE *file); - virtual Log_event_type get_type_code() { return INCIDENT_EVENT; } virtual bool is_valid() const @@ -4752,6 +4762,14 @@ private: uint ident_len; }; +inline int Log_event_writer::write(Log_event *ev) +{ + ev->writer= this; + int res= ev->write(); + IF_DBUG(ev->writer= 0,); // writer must be set before every Log_event::write + return res; +} + /** The function is called by slave applier in case there are active table filtering rules to force gathering events associated diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index 4779e90d815..ce1bf614bc9 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -1753,7 +1753,7 @@ Old_rows_log_event::do_update_pos(rpl_group_info *rgi) #ifndef MYSQL_CLIENT -bool Old_rows_log_event::write_data_header(IO_CACHE *file) +bool Old_rows_log_event::write_data_header() { uchar buf[ROWS_HEADER_LEN]; // No need to init the buffer @@ -1765,15 +1765,15 @@ bool Old_rows_log_event::write_data_header(IO_CACHE *file) { int4store(buf + 0, m_table_id); int2store(buf + 4, m_flags); - return (my_b_safe_write(file, buf, 6)); + return write_data(buf, 6); }); int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id); int2store(buf + RW_FLAGS_OFFSET, m_flags); - return (my_b_safe_write(file, buf, ROWS_HEADER_LEN)); + return write_data(buf, ROWS_HEADER_LEN); } -bool Old_rows_log_event::write_data_body(IO_CACHE*file) +bool Old_rows_log_event::write_data_body() { /* Note that this should be the number of *bits*, not the number of @@ -1790,13 +1790,12 @@ bool Old_rows_log_event::write_data_body(IO_CACHE*file) DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf)); DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf)); - res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf)); + res= res || write_data(sbuf, (size_t) (sbuf_end - sbuf)); DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols)); - res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap, - no_bytes_in_map(&m_cols)); + res= res || write_data((uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols)); DBUG_DUMP("rows", m_rows_buf, data_size); - res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size); + res= res || write_data(m_rows_buf, (size_t) data_size); return res; diff --git a/sql/log_event_old.h b/sql/log_event_old.h index ed07f753e7a..edc74ca1a6f 100644 --- a/sql/log_event_old.h +++ b/sql/log_event_old.h @@ -134,8 +134,8 @@ public: ulong get_table_id() const { return m_table_id; } #ifndef MYSQL_CLIENT - virtual bool write_data_header(IO_CACHE *file); - virtual bool write_data_body(IO_CACHE *file); + virtual bool write_data_header(); + virtual bool write_data_body(); virtual const char *get_db() { return m_table->s->db.str; } #endif /* diff --git a/sql/slave.cc b/sql/slave.cc index de6bf95b50e..13acc8c62ba 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -5501,7 +5501,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mi->master_log_name, rev.new_log_ident); mysql_mutex_lock(log_lock); - if (likely(!fdle.write(rli->relay_log.get_log_file()) && + if (likely(!rli->relay_log.write_event(&fdle) && !rli->relay_log.flush_and_sync(NULL))) { rli->relay_log.harvest_bytes_written(&rli->log_space_total); diff --git a/sql/wsrep_binlog.cc b/sql/wsrep_binlog.cc index 0bc04ebb066..36917674128 100644 --- a/sql/wsrep_binlog.cc +++ b/sql/wsrep_binlog.cc @@ -445,6 +445,7 @@ void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf, char filename[PATH_MAX]= {0}; File file; IO_CACHE cache; + Log_event_writer writer(&cache); Format_description_log_event *ev= wsrep_get_apply_format(thd); int len= my_snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld_v2.log", @@ -476,7 +477,7 @@ void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf, goto cleanup2; } - if (ev->write(&cache) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) || + if (writer.write(ev) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) || flush_io_cache(&cache)) { WSREP_ERROR("Failed to write to '%s'.", filename); diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 0a44dd278ac..a785b8764cf 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -1215,6 +1215,7 @@ int wsrep_to_buf_helper( THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len) { IO_CACHE tmp_io_cache; + Log_event_writer writer(&tmp_io_cache); if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, 65536, MYF(MY_WME))) return 1; @@ -1222,7 +1223,7 @@ int wsrep_to_buf_helper( Format_description_log_event *tmp_fd= new Format_description_log_event(4); tmp_fd->checksum_alg= (enum_binlog_checksum_alg)binlog_checksum_options; - tmp_fd->write(&tmp_io_cache); + writer.write(tmp_fd); delete tmp_fd; #ifdef GTID_SUPPORT @@ -1230,7 +1231,7 @@ int wsrep_to_buf_helper( { Gtid_log_event gtid_ev(thd, FALSE, &thd->variables.gtid_next); if (!gtid_ev.is_valid()) ret= 0; - if (!ret && gtid_ev.write(&tmp_io_cache)) ret= 1; + if (!ret && writer.write(>id_ev)) ret= 1; } #endif /* GTID_SUPPORT */ @@ -1240,12 +1241,12 @@ int wsrep_to_buf_helper( Query_log_event ev(thd, thd->wsrep_TOI_pre_query, thd->wsrep_TOI_pre_query_len, FALSE, FALSE, FALSE, 0); - if (ev.write(&tmp_io_cache)) ret= 1; + if (writer.write(&ev)) ret= 1; } /* continue to append the actual query */ Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0); - if (!ret && ev.write(&tmp_io_cache)) ret= 1; + if (!ret && writer.write(&ev)) ret= 1; if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1; close_cached_file(&tmp_io_cache); return ret; |