summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/log.cc259
-rw-r--r--sql/log.h3
-rw-r--r--sql/log_event.cc461
-rw-r--r--sql/log_event.h214
-rw-r--r--sql/log_event_old.cc15
-rw-r--r--sql/log_event_old.h4
-rw-r--r--sql/slave.cc2
-rw-r--r--sql/wsrep_binlog.cc3
-rw-r--r--sql/wsrep_mysqld.cc9
9 files changed, 456 insertions, 514 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 0944c6e8b42..8d8c96f6703 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -3461,7 +3461,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
if (!s.is_valid())
goto err;
s.dont_set_created= null_created_arg;
- if (s.write(&log_file))
+ if (write_event(&s))
goto err;
bytes_written+= s.data_written;
@@ -3504,7 +3504,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
*/
Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state, 0);
- if (gl_ev.write(&log_file))
+ if (write_event(&gl_ev))
goto err;
/* Output a binlog checkpoint event at the start of the binlog file. */
@@ -3555,7 +3555,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
flush_io_cache(&log_file);
mysql_file_sync(log_file.file, MYF(MY_WME));
DBUG_SUICIDE(););
- if (ev.write(&log_file))
+ if (write_event(&ev))
goto err;
bytes_written+= ev.data_written;
}
@@ -3587,7 +3587,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
/* Don't set log_pos in event header */
description_event_for_queue->set_artificial_event();
- if (description_event_for_queue->write(&log_file))
+ if (write_event(description_event_for_queue))
goto err;
bytes_written+= description_event_for_queue->data_written;
}
@@ -4991,7 +4991,7 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
r.checksum_alg= relay_log_checksum_alg;
DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
if(DBUG_EVALUATE_IF("fault_injection_new_file_rotate_event", (error=close_on_error=TRUE), FALSE) ||
- (error= r.write(&log_file)))
+ (error= write_event(&r)))
{
DBUG_EXECUTE_IF("fault_injection_new_file_rotate_event", errno=2;);
close_on_error= TRUE;
@@ -5105,9 +5105,13 @@ end:
DBUG_RETURN(error);
}
+bool MYSQL_BIN_LOG::write_event(Log_event *ev, IO_CACHE *file)
+{
+ Log_event_writer writer(file);
+ return writer.write(ev);
+}
-bool
-MYSQL_BIN_LOG::append(Log_event *ev)
+bool MYSQL_BIN_LOG::append(Log_event *ev)
{
bool res;
mysql_mutex_lock(&LOCK_log);
@@ -5124,11 +5128,8 @@ bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev)
mysql_mutex_assert_owner(&LOCK_log);
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
- /*
- Log_event::write() is smart enough to use my_b_write() or
- my_b_append() depending on the kind of cache we have.
- */
- if (ev->write(&log_file))
+
+ if (write_event(ev))
{
error=1;
goto err;
@@ -5516,15 +5517,16 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
IO_CACHE *file=
cache_mngr->get_binlog_cache_log(use_trans_cache(this, is_transactional));
+ Log_event_writer writer(file);
if (with_annotate && *with_annotate)
{
Annotate_rows_log_event anno(table->in_use, is_transactional, false);
/* Annotate event should be written not more than once */
*with_annotate= 0;
- if ((error= anno.write(file)))
+ if ((error= writer.write(&anno)))
DBUG_RETURN(error);
}
- if ((error= the_event.write(file)))
+ if ((error= writer.write(&the_event)))
DBUG_RETURN(error);
binlog_table_maps++;
@@ -5651,14 +5653,14 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
if (Rows_log_event* pending= cache_data->pending())
{
- IO_CACHE *file= &cache_data->cache_log;
+ Log_event_writer writer(&cache_data->cache_log);
/*
Write pending event to the cache.
*/
DBUG_EXECUTE_IF("simulate_disk_full_at_flush_pending",
{DBUG_SET("+d,simulate_file_write_error");});
- if (pending->write(file))
+ if (writer.write(pending))
{
set_write_error(thd, is_transactional);
if (check_write_error(thd) && cache_data &&
@@ -5746,7 +5748,8 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
commit_id);
/* Write the event to the binary log. */
- if (gtid_event.write(&mysql_bin_log.log_file))
+ DBUG_ASSERT(this == &mysql_bin_log);
+ if (write_event(&gtid_event))
DBUG_RETURN(true);
status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written);
@@ -6080,7 +6083,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
Annotate_rows_log_event anno(thd, using_trans, direct);
/* Annotate event should be written not more than once */
*with_annotate= 0;
- if (anno.write(file))
+ if (write_event(&anno, file))
goto err;
}
@@ -6094,7 +6097,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
Intvar_log_event e(thd,(uchar) LAST_INSERT_ID_EVENT,
thd->first_successful_insert_id_in_prev_stmt_for_binlog,
using_trans, direct);
- if (e.write(file))
+ if (write_event(&e, file))
goto err;
}
if (thd->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements() > 0)
@@ -6105,14 +6108,14 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
Intvar_log_event e(thd, (uchar) INSERT_ID_EVENT,
thd->auto_inc_intervals_in_cur_stmt_for_binlog.
minimum(), using_trans, direct);
- if (e.write(file))
+ if (write_event(&e, file))
goto err;
}
if (thd->rand_used)
{
Rand_log_event e(thd,thd->rand_saved_seed1,thd->rand_saved_seed2,
using_trans, direct);
- if (e.write(file))
+ if (write_event(&e, file))
goto err;
}
if (thd->user_var_events.elements)
@@ -6136,7 +6139,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
flags,
using_trans,
direct);
- if (e.write(file))
+ if (write_event(&e, file))
goto err;
}
}
@@ -6146,7 +6149,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
/*
Write the event.
*/
- if (event_info->write(file) ||
+ if (write_event(event_info, file) ||
DBUG_EVALUATE_IF("injecting_fault_writing", 1, 0))
goto err;
@@ -6525,33 +6528,34 @@ uint MYSQL_BIN_LOG::next_file_id()
return res;
}
+class CacheWriter: public Log_event_writer
+{
+public:
+ ulong remains;
-/**
- Calculate checksum of possibly a part of an event containing at least
- the whole common header.
-
- @param buf the pointer to trans cache's buffer
- @param off the offset of the beginning of the event in the buffer
- @param event_len no-checksum length of the event
- @param length the current size of the buffer
-
- @param crc [in-out] the checksum
+ CacheWriter(THD *thd_arg, IO_CACHE *file_arg, bool do_checksum)
+ : Log_event_writer(file_arg), remains(0), thd(thd_arg), first(true)
+ { checksum_len= do_checksum ? BINLOG_CHECKSUM_LEN : 0; }
- Event size in incremented by @c BINLOG_CHECKSUM_LEN.
+ ~CacheWriter()
+ { status_var_add(thd->status_var.binlog_bytes_written, bytes_written); }
- @return 0 or number of unprocessed yet bytes of the event excluding
- the checksum part.
-*/
- static ulong fix_log_event_crc(uchar *buf, uint off, uint event_len,
- uint length, ha_checksum *crc)
-{
- ulong ret;
- uchar *event_begin= buf + off;
+ int write(uchar* pos, size_t len)
+ {
+ if (first)
+ write_header(pos, len);
+ else
+ write_data(pos, len);
- ret= length >= off + event_len ? 0 : off + event_len - length;
- *crc= my_checksum(*crc, event_begin, event_len - ret);
- return ret;
-}
+ remains -= len;
+ if ((first= !remains))
+ write_footer();
+ return 0;
+ }
+private:
+ THD *thd;
+ bool first;
+};
/*
Write the contents of a cache to the binary log.
@@ -6572,21 +6576,19 @@ uint MYSQL_BIN_LOG::next_file_id()
int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
{
+ DBUG_ENTER("MYSQL_BIN_LOG::write_cache");
+
mysql_mutex_assert_owner(&LOCK_log);
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
- return ER_ERROR_ON_WRITE;
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
- ulong remains= 0; // part of unprocessed yet netto length of the event
long val;
ulong end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t
uchar header[LOG_EVENT_HEADER_LEN];
- ha_checksum crc= 0;
- my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF);
- uchar buf[BINLOG_CHECKSUM_LEN];
- DBUG_ENTER("MYSQL_BIN_LOG::write_cache");
+ CacheWriter writer(thd, &log_file, binlog_checksum_options);
// while there is just one alg the following must hold:
- DBUG_ASSERT(!do_checksum ||
+ DBUG_ASSERT(binlog_checksum_options == BINLOG_CHECKSUM_ALG_OFF ||
binlog_checksum_options == BINLOG_CHECKSUM_ALG_CRC32);
/*
@@ -6615,53 +6617,40 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
if (unlikely(carry > 0))
{
DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN);
+ uint tail= LOG_EVENT_HEADER_LEN - carry;
/* assemble both halves */
- memcpy(&header[carry], (char *)cache->read_pos,
- LOG_EVENT_HEADER_LEN - carry);
+ memcpy(&header[carry], (char *)cache->read_pos, tail);
+
+ ulong len= uint4korr(header + EVENT_LEN_OFFSET);
+ writer.remains= len;
/* fix end_log_pos */
- val= uint4korr(&header[LOG_POS_OFFSET]) + group +
- (end_log_pos_inc+= (do_checksum ? BINLOG_CHECKSUM_LEN : 0));
- int4store(&header[LOG_POS_OFFSET], val);
+ end_log_pos_inc += writer.checksum_len;
+ val= uint4korr(header + LOG_POS_OFFSET) + group + end_log_pos_inc;
+ int4store(header + LOG_POS_OFFSET, val);
- if (do_checksum)
- {
- ulong len= uint4korr(&header[EVENT_LEN_OFFSET]);
- /* fix len */
- int4store(&header[EVENT_LEN_OFFSET], len + BINLOG_CHECKSUM_LEN);
- }
+ /* fix len */
+ len+= writer.checksum_len;
+ int4store(header + EVENT_LEN_OFFSET, len);
- /* write the first half of the split header */
- if (my_b_write(&log_file, header, carry))
+ if (writer.write(header, LOG_EVENT_HEADER_LEN))
DBUG_RETURN(ER_ERROR_ON_WRITE);
- status_var_add(thd->status_var.binlog_bytes_written, carry);
- /*
- copy fixed second half of header to cache so the correct
- version will be written later.
- */
- memcpy((char *)cache->read_pos, &header[carry],
- LOG_EVENT_HEADER_LEN - carry);
+ cache->read_pos+= tail;
+ length-= tail;
+ carry= 0;
/* next event header at ... */
- hdr_offs= uint4korr(&header[EVENT_LEN_OFFSET]) - carry -
- (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
-
- if (do_checksum)
- {
- DBUG_ASSERT(crc == 0 && remains == 0);
- crc= my_checksum(crc, header, carry);
- remains= uint4korr(header + EVENT_LEN_OFFSET) - carry -
- BINLOG_CHECKSUM_LEN;
- }
- carry= 0;
+ hdr_offs= len - LOG_EVENT_HEADER_LEN - writer.checksum_len;
}
/* if there is anything to write, process it. */
if (likely(length > 0))
{
+ DBUG_EXECUTE_IF("fail_binlog_write_1",
+ errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE););
/*
process all event-headers in this (partial) cache.
if next header is beyond current read-buffer,
@@ -6669,52 +6658,28 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
very next iteration, just "eventually").
*/
- /* crc-calc the whole buffer */
- if (do_checksum && hdr_offs >= length)
+ if (hdr_offs >= length)
{
-
- DBUG_ASSERT(remains != 0 && crc != 0);
-
- crc= my_checksum(crc, cache->read_pos, length);
- remains -= length;
- if (my_b_write(&log_file, cache->read_pos, length))
+ if (writer.write(cache->read_pos, length))
DBUG_RETURN(ER_ERROR_ON_WRITE);
- if (remains == 0)
- {
- int4store(buf, crc);
- if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
- DBUG_RETURN(ER_ERROR_ON_WRITE);
- crc= 0;
- }
}
while (hdr_offs < length)
{
/*
- partial header only? save what we can get, process once
- we get the rest.
+ finish off with remains of the last event that crawls
+ from previous into the current buffer
*/
-
- if (do_checksum)
+ if (writer.remains != 0)
{
- if (remains != 0)
- {
- /*
- finish off with remains of the last event that crawls
- from previous into the current buffer
- */
- DBUG_ASSERT(crc != 0);
- crc= my_checksum(crc, cache->read_pos, hdr_offs);
- int4store(buf, crc);
- remains -= hdr_offs;
- DBUG_ASSERT(remains == 0);
- if (my_b_write(&log_file, cache->read_pos, hdr_offs) ||
- my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
- DBUG_RETURN(ER_ERROR_ON_WRITE);
- crc= 0;
- }
+ if (writer.write(cache->read_pos, hdr_offs))
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
}
+ /*
+ partial header only? save what we can get, process once
+ we get the rest.
+ */
if (hdr_offs + LOG_EVENT_HEADER_LEN > length)
{
carry= length - hdr_offs;
@@ -6725,37 +6690,25 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
{
/* we've got a full event-header, and it came in one piece */
uchar *ev= (uchar *)cache->read_pos + hdr_offs;
- uint event_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len
+ uint ev_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len
uchar *log_pos= ev + LOG_POS_OFFSET;
+ end_log_pos_inc += writer.checksum_len;
/* fix end_log_pos */
- val= uint4korr(log_pos) + group +
- (end_log_pos_inc += (do_checksum ? BINLOG_CHECKSUM_LEN : 0));
+ val= uint4korr(log_pos) + group + end_log_pos_inc;
int4store(log_pos, val);
- /* fix CRC */
- if (do_checksum)
- {
- /* fix length */
- int4store(ev + EVENT_LEN_OFFSET, event_len + BINLOG_CHECKSUM_LEN);
- remains= fix_log_event_crc(cache->read_pos, hdr_offs, event_len,
- length, &crc);
- if (my_b_write(&log_file, ev,
- remains == 0 ? event_len : length - hdr_offs))
- DBUG_RETURN(ER_ERROR_ON_WRITE);
- if (remains == 0)
- {
- int4store(buf, crc);
- if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
- DBUG_RETURN(ER_ERROR_ON_WRITE);
- crc= 0; // crc is complete
- }
- }
+ /* fix length */
+ int4store(ev + EVENT_LEN_OFFSET, ev_len + writer.checksum_len);
+
+ writer.remains= ev_len;
+ if (writer.write(ev, std::min<uint>(ev_len, length - hdr_offs)))
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
/* next event header at ... */
- hdr_offs += event_len; // incr by the netto len
+ hdr_offs += ev_len; // incr by the netto len
- DBUG_ASSERT(!do_checksum || remains == 0 || hdr_offs >= length);
+ DBUG_ASSERT(!writer.checksum_len || writer.remains == 0 || hdr_offs >= length);
}
}
@@ -6769,20 +6722,10 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
*/
hdr_offs -= length;
}
-
- /* Write data to the binary log file */
- DBUG_EXECUTE_IF("fail_binlog_write_1",
- errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE););
- if (!do_checksum)
- if (my_b_write(&log_file, cache->read_pos, length))
- DBUG_RETURN(ER_ERROR_ON_WRITE);
- status_var_add(thd->status_var.binlog_bytes_written, length);
-
} while ((length= my_b_fill(cache)));
DBUG_ASSERT(carry == 0);
- DBUG_ASSERT(!do_checksum || remains == 0);
- DBUG_ASSERT(!do_checksum || crc == 0);
+ DBUG_ASSERT(!writer.checksum_len || writer.remains == 0);
DBUG_RETURN(0); // All OK
}
@@ -6827,7 +6770,7 @@ bool MYSQL_BIN_LOG::write_incident_already_locked(THD *thd)
if (likely(is_open()))
{
- error= ev.write(&log_file);
+ error= write_event(&ev);
status_var_add(thd->status_var.binlog_bytes_written, ev.data_written);
}
@@ -6889,7 +6832,7 @@ MYSQL_BIN_LOG::write_binlog_checkpoint_event_already_locked(const char *name_arg
Otherwise a subsequent log purge could delete binlogs that XA recovery
thinks are needed (even though they are not really).
*/
- if (!ev.write(&log_file) && !flush_and_sync(0))
+ if (!write_event(&ev) && !flush_and_sync(0))
{
signal_update();
}
@@ -7797,7 +7740,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
DBUG_RETURN(ER_ERROR_ON_WRITE);
});
- if (entry->end_event->write(&log_file))
+ if (write_event(entry->end_event))
{
entry->error_cache= NULL;
DBUG_RETURN(ER_ERROR_ON_WRITE);
@@ -7807,7 +7750,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
if (entry->incident_event)
{
- if (entry->incident_event->write(&log_file))
+ if (write_event(entry->incident_event))
{
entry->error_cache= NULL;
DBUG_RETURN(ER_ERROR_ON_WRITE);
@@ -8066,7 +8009,7 @@ void MYSQL_BIN_LOG::close(uint exiting)
: (enum_binlog_checksum_alg)binlog_checksum_options;
DBUG_ASSERT(!is_relay_log ||
relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
- s.write(&log_file);
+ write_event(&s);
bytes_written+= s.data_written;
signal_update();
diff --git a/sql/log.h b/sql/log.h
index fcec606443f..31cb4b2f0ae 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -737,6 +737,9 @@ public:
void stop_union_events(THD *thd);
bool is_query_in_union(THD *thd, query_id_t query_id_param);
+ bool write_event(Log_event *ev, IO_CACHE *file);
+ bool write_event(Log_event *ev) { return write_event(ev, &log_file); }
+
/*
v stands for vector
invoked as appendv(buf1,len1,buf2,len2,...,bufn,lenn,0)
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 20ce96c2f7b..8617e35953e 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -668,19 +668,6 @@ static void cleanup_load_tmpdir(LEX_STRING *connection_name)
/*
- write_str()
-*/
-
-static bool write_str(IO_CACHE *file, const char *str, uint length)
-{
- uchar tmp[1];
- tmp[0]= (uchar) length;
- return (my_b_safe_write(file, tmp, sizeof(tmp)) ||
- my_b_safe_write(file, (uchar*) str, length));
-}
-
-
-/*
read_str()
*/
@@ -845,8 +832,7 @@ const char* Log_event::get_type_str()
#ifndef MYSQL_CLIENT
Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
- :log_pos(0), temp_buf(0), exec_time(0),
- crc(0), thd(thd_arg),
+ :log_pos(0), temp_buf(0), exec_time(0), thd(thd_arg),
checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{
server_id= thd->variables.server_id;
@@ -870,8 +856,7 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
*/
Log_event::Log_event()
- :temp_buf(0), exec_time(0), flags(0),
- cache_type(Log_event::EVENT_INVALID_CACHE), crc(0),
+ :temp_buf(0), exec_time(0), flags(0), cache_type(EVENT_INVALID_CACHE),
thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{
server_id= global_system_variables.server_id;
@@ -893,7 +878,7 @@ Log_event::Log_event()
Log_event::Log_event(const char* buf,
const Format_description_log_event* description_event)
:temp_buf(0), exec_time(0), cache_type(Log_event::EVENT_INVALID_CACHE),
- crc(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
+ checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{
#ifndef MYSQL_CLIENT
thd = 0;
@@ -1159,26 +1144,51 @@ my_bool Log_event::need_checksum()
DBUG_RETURN(ret);
}
-bool Log_event::wrapper_my_b_safe_write(IO_CACHE* file, const uchar* buf, ulong size)
+int Log_event_writer::write_internal(const uchar *pos, size_t len)
{
- if (need_checksum() && size != 0)
- crc= my_checksum(crc, buf, size);
-
- return my_b_safe_write(file, buf, size);
+ if (my_b_safe_write(file, pos, len))
+ return 1;
+ bytes_written+= len;
+ return 0;
}
-bool Log_event::write_footer(IO_CACHE* file)
+int Log_event_writer::write_header(uchar *pos, size_t len)
{
DBUG_ENTER("write_footer");
/*
- (optional) footer contains the checksum value
+ recording checksum of FD event computed with dropped
+ possibly active LOG_EVENT_BINLOG_IN_USE_F flag.
+ Similar step at verication: the active flag is dropped before
+ checksum computing.
*/
- if (need_checksum())
+ if (checksum_len)
+ {
+ uchar save=pos[FLAGS_OFFSET];
+ pos[FLAGS_OFFSET]&= ~LOG_EVENT_BINLOG_IN_USE_F;
+ crc= my_checksum(0, pos, len);
+ pos[FLAGS_OFFSET]= save;
+ }
+
+ return write_internal(pos, len);
+}
+
+int Log_event_writer::write_data(const uchar *pos, size_t len)
+{
+ if (checksum_len)
+ crc= my_checksum(crc, pos, len);
+
+ return write_internal(pos, len);
+}
+
+int Log_event_writer::write_footer()
+{
+ DBUG_ENTER("Log_event_writer::write_footer");
+ if (checksum_len)
{
- DBUG_PRINT("info", ("Writing checksum"));
- uchar buf[BINLOG_CHECKSUM_LEN];
- int4store(buf, crc);
- DBUG_RETURN(my_b_safe_write(file, (uchar*) buf, sizeof(buf)));
+ uchar checksum_buf[BINLOG_CHECKSUM_LEN];
+ int4store(checksum_buf, crc);
+ if (write_internal(checksum_buf, BINLOG_CHECKSUM_LEN))
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
}
DBUG_RETURN(0);
}
@@ -1187,24 +1197,19 @@ bool Log_event::write_footer(IO_CACHE* file)
Log_event::write_header()
*/
-bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
+bool Log_event::write_header(ulong event_data_length)
{
uchar header[LOG_EVENT_HEADER_LEN];
ulong now;
- bool ret;
DBUG_ENTER("Log_event::write_header");
DBUG_PRINT("enter", ("filepos: %lld length: %lu type: %d",
- (longlong) my_b_tell(file), event_data_length,
+ (longlong) writer->pos(), event_data_length,
(int) get_type_code()));
- /* Store number of bytes that will be written by this event */
- data_written= event_data_length + sizeof(header);
+ writer->checksum_len= need_checksum() ? BINLOG_CHECKSUM_LEN : 0;
- if (need_checksum())
- {
- crc= 0;
- data_written += BINLOG_CHECKSUM_LEN;
- }
+ /* Store number of bytes that will be written by this event */
+ data_written= event_data_length + sizeof(header) + writer->checksum_len;
/*
log_pos != 0 if this is relay-log event. In this case we should not
@@ -1226,7 +1231,7 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
(end of this event, that is).
*/
- log_pos= my_b_safe_tell(file)+data_written;
+ log_pos= writer->pos() + data_written;
}
now= get_time(); // Query start time
@@ -1243,36 +1248,10 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
int4store(header+ SERVER_ID_OFFSET, server_id);
int4store(header+ EVENT_LEN_OFFSET, data_written);
int4store(header+ LOG_POS_OFFSET, log_pos);
- /*
- recording checksum of FD event computed with dropped
- possibly active LOG_EVENT_BINLOG_IN_USE_F flag.
- Similar step at verication: the active flag is dropped before
- checksum computing.
- */
- if (header[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT ||
- !need_checksum() || !(flags & LOG_EVENT_BINLOG_IN_USE_F))
- {
- int2store(header+ FLAGS_OFFSET, flags);
- ret= wrapper_my_b_safe_write(file, header, sizeof(header)) != 0;
- }
- else
- {
- ret= (wrapper_my_b_safe_write(file, header, FLAGS_OFFSET) != 0);
- if (!ret)
- {
- flags &= ~LOG_EVENT_BINLOG_IN_USE_F;
- int2store(header + FLAGS_OFFSET, flags);
- crc= my_checksum(crc, header + FLAGS_OFFSET, sizeof(flags));
- flags |= LOG_EVENT_BINLOG_IN_USE_F;
- int2store(header + FLAGS_OFFSET, flags);
- ret= (my_b_safe_write(file, header + FLAGS_OFFSET, sizeof(flags)) != 0);
- }
- if (!ret)
- ret= (wrapper_my_b_safe_write(file, header + FLAGS_OFFSET + sizeof(flags),
- sizeof(header)
- - (FLAGS_OFFSET + sizeof(flags))) != 0);
- }
- DBUG_RETURN( ret);
+ int2store(header + FLAGS_OFFSET, flags);
+
+ bool ret= writer->write_header(header, sizeof(header));
+ DBUG_RETURN(ret);
}
#endif /* !MYSQL_CLIENT */
@@ -1664,9 +1643,11 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
if (ev)
{
ev->checksum_alg= alg;
+#ifdef MYSQL_CLIENT
if (ev->checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
ev->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
ev->crc= uint4korr(buf + (event_len));
+#endif
}
DBUG_PRINT("read_event", ("%s(type_code: %u; event_len: %u)",
@@ -2749,7 +2730,7 @@ static void store_str_with_code_and_len(uchar **dst, const char *src,
will print!
*/
-bool Query_log_event::write(IO_CACHE* file)
+bool Query_log_event::write()
{
uchar buf[QUERY_HEADER_LEN + MAX_SIZE_LOG_EVENT_STATUS];
uchar *start, *start_of_status;
@@ -2978,14 +2959,13 @@ bool Query_log_event::write(IO_CACHE* file)
*/
event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
- return (write_header(file, event_length) ||
- wrapper_my_b_safe_write(file, (uchar*) buf, QUERY_HEADER_LEN) ||
- write_post_header_for_derived(file) ||
- wrapper_my_b_safe_write(file, (uchar*) start_of_status,
- (uint) (start-start_of_status)) ||
- wrapper_my_b_safe_write(file, (db) ? (uchar*) db : (uchar*)"", db_len + 1) ||
- wrapper_my_b_safe_write(file, (uchar*) query, q_len) ||
- write_footer(file)) ? 1 : 0;
+ return write_header(event_length) ||
+ write_data(buf, QUERY_HEADER_LEN) ||
+ write_post_header_for_derived() ||
+ write_data(start_of_status, (uint) (start-start_of_status)) ||
+ write_data(safe_str(db), db_len + 1) ||
+ write_data(query, q_len) ||
+ write_footer();
}
/**
@@ -4610,7 +4590,7 @@ Start_log_event_v3::Start_log_event_v3(const char* buf, uint event_len,
*/
#ifndef MYSQL_CLIENT
-bool Start_log_event_v3::write(IO_CACHE* file)
+bool Start_log_event_v3::write()
{
char buff[START_V3_HEADER_LEN];
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
@@ -4618,9 +4598,9 @@ bool Start_log_event_v3::write(IO_CACHE* file)
if (!dont_set_created)
created= get_time(); // this sets when and when_sec_part as a side effect
int4store(buff + ST_CREATED_OFFSET,created);
- return (write_header(file, sizeof(buff)) ||
- wrapper_my_b_safe_write(file, (uchar*) buff, sizeof(buff)) ||
- write_footer(file));
+ return write_header(sizeof(buff)) ||
+ write_data(buff, sizeof(buff)) ||
+ write_footer();
}
#endif
@@ -4933,7 +4913,7 @@ Format_description_log_event(const char* buf,
}
#ifndef MYSQL_CLIENT
-bool Format_description_log_event::write(IO_CACHE* file)
+bool Format_description_log_event::write()
{
bool ret;
bool no_checksum;
@@ -4981,12 +4961,11 @@ bool Format_description_log_event::write(IO_CACHE* file)
{
checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway
}
- ret= (write_header(file, rec_size) ||
- wrapper_my_b_safe_write(file, buff, sizeof(buff)) ||
- wrapper_my_b_safe_write(file, (uchar*)post_header_len,
- number_of_event_types) ||
- wrapper_my_b_safe_write(file, &checksum_byte, sizeof(checksum_byte)) ||
- write_footer(file));
+ ret= write_header(rec_size) ||
+ write_data(buff, sizeof(buff)) ||
+ write_data(post_header_len, number_of_event_types) ||
+ write_data(&checksum_byte, sizeof(checksum_byte)) ||
+ write_footer();
if (no_checksum)
checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
return ret;
@@ -5332,7 +5311,7 @@ void Load_log_event::pack_info(THD *thd, Protocol *protocol)
Load_log_event::write_data_header()
*/
-bool Load_log_event::write_data_header(IO_CACHE* file)
+bool Load_log_event::write_data_header()
{
char buf[LOAD_HEADER_LEN];
int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
@@ -5341,7 +5320,7 @@ bool Load_log_event::write_data_header(IO_CACHE* file)
buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
buf[L_DB_LEN_OFFSET] = (char)db_len;
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
- return my_b_safe_write(file, (uchar*)buf, LOAD_HEADER_LEN) != 0;
+ return write_data(buf, LOAD_HEADER_LEN) != 0;
}
@@ -5349,19 +5328,19 @@ bool Load_log_event::write_data_header(IO_CACHE* file)
Load_log_event::write_data_body()
*/
-bool Load_log_event::write_data_body(IO_CACHE* file)
+bool Load_log_event::write_data_body()
{
- if (sql_ex.write_data(file))
+ if (sql_ex.write_data(writer))
return 1;
if (num_fields && fields && field_lens)
{
- if (my_b_safe_write(file, (uchar*)field_lens, num_fields) ||
- my_b_safe_write(file, (uchar*)fields, field_block_len))
+ if (write_data(field_lens, num_fields) ||
+ write_data(fields, field_block_len))
return 1;
}
- return (my_b_safe_write(file, (uchar*)table_name, table_name_len + 1) ||
- my_b_safe_write(file, (uchar*)db, db_len + 1) ||
- my_b_safe_write(file, (uchar*)fname, fname_len));
+ return (write_data(table_name, table_name_len + 1) ||
+ write_data(db, db_len + 1) ||
+ write_data(fname, fname_len));
}
@@ -6078,15 +6057,14 @@ Rotate_log_event::Rotate_log_event(const char* buf, uint event_len,
*/
#ifndef MYSQL_CLIENT
-bool Rotate_log_event::write(IO_CACHE* file)
+bool Rotate_log_event::write()
{
char buf[ROTATE_HEADER_LEN];
int8store(buf + R_POS_OFFSET, pos);
- return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
- wrapper_my_b_safe_write(file, (uchar*) buf, ROTATE_HEADER_LEN) ||
- wrapper_my_b_safe_write(file, (uchar*) new_log_ident,
- (uint) ident_len) ||
- write_footer(file));
+ return (write_header(ROTATE_HEADER_LEN + ident_len) ||
+ write_data(buf, ROTATE_HEADER_LEN) ||
+ write_data(new_log_ident, (uint) ident_len) ||
+ write_footer());
}
#endif
@@ -6276,15 +6254,14 @@ Binlog_checkpoint_log_event::Binlog_checkpoint_log_event(
#ifndef MYSQL_CLIENT
-bool Binlog_checkpoint_log_event::write(IO_CACHE *file)
+bool Binlog_checkpoint_log_event::write()
{
uchar buf[BINLOG_CHECKPOINT_HEADER_LEN];
int4store(buf, binlog_file_len);
- return write_header(file, BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) ||
- wrapper_my_b_safe_write(file, buf, BINLOG_CHECKPOINT_HEADER_LEN) ||
- wrapper_my_b_safe_write(file, (const uchar *)binlog_file_name,
- binlog_file_len) ||
- write_footer(file);
+ return write_header(BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) ||
+ write_data(buf, BINLOG_CHECKPOINT_HEADER_LEN) ||
+ write_data(binlog_file_name, binlog_file_len) ||
+ write_footer();
}
#endif /* MYSQL_CLIENT */
@@ -6386,7 +6363,7 @@ Gtid_log_event::peek(const char *event_start, size_t event_len,
bool
-Gtid_log_event::write(IO_CACHE *file)
+Gtid_log_event::write()
{
uchar buf[GTID_HEADER_LEN+2];
size_t write_len;
@@ -6404,9 +6381,9 @@ Gtid_log_event::write(IO_CACHE *file)
bzero(buf+13, GTID_HEADER_LEN-13);
write_len= GTID_HEADER_LEN;
}
- return write_header(file, write_len) ||
- wrapper_my_b_safe_write(file, buf, write_len) ||
- write_footer(file);
+ return write_header(write_len) ||
+ write_data(buf, write_len) ||
+ write_footer();
}
@@ -6764,7 +6741,7 @@ Gtid_list_log_event::to_packet(String *packet)
bool
-Gtid_list_log_event::write(IO_CACHE *file)
+Gtid_list_log_event::write()
{
char buf[128];
String packet(buf, sizeof(buf), system_charset_info);
@@ -6772,10 +6749,9 @@ Gtid_list_log_event::write(IO_CACHE *file)
packet.length(0);
if (to_packet(&packet))
return true;
- return
- write_header(file, get_data_size()) ||
- wrapper_my_b_safe_write(file, (uchar *)packet.ptr(), packet.length()) ||
- write_footer(file);
+ return write_header(get_data_size()) ||
+ write_data(packet.ptr(), packet.length()) ||
+ write_footer();
}
@@ -6980,14 +6956,14 @@ const char* Intvar_log_event::get_var_type_name()
*/
#ifndef MYSQL_CLIENT
-bool Intvar_log_event::write(IO_CACHE* file)
+bool Intvar_log_event::write()
{
uchar buf[9];
buf[I_TYPE_OFFSET]= (uchar) type;
int8store(buf + I_VAL_OFFSET, val);
- return (write_header(file, sizeof(buf)) ||
- wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
- write_footer(file));
+ return write_header(sizeof(buf)) ||
+ write_data(buf, sizeof(buf)) ||
+ write_footer();
}
#endif
@@ -7110,14 +7086,14 @@ Rand_log_event::Rand_log_event(const char* buf,
#ifndef MYSQL_CLIENT
-bool Rand_log_event::write(IO_CACHE* file)
+bool Rand_log_event::write()
{
uchar buf[16];
int8store(buf + RAND_SEED1_OFFSET, seed1);
int8store(buf + RAND_SEED2_OFFSET, seed2);
- return (write_header(file, sizeof(buf)) ||
- wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
- write_footer(file));
+ return write_header(sizeof(buf)) ||
+ write_data(buf, sizeof(buf)) ||
+ write_footer();
}
#endif
@@ -7236,12 +7212,12 @@ Xid_log_event(const char* buf,
#ifndef MYSQL_CLIENT
-bool Xid_log_event::write(IO_CACHE* file)
+bool Xid_log_event::write()
{
DBUG_EXECUTE_IF("do_not_write_xid", return 0;);
- return (write_header(file, sizeof(xid)) ||
- wrapper_my_b_safe_write(file, (uchar*) &xid, sizeof(xid)) ||
- write_footer(file));
+ return write_header(sizeof(xid)) ||
+ write_data((uchar*)&xid, sizeof(xid)) ||
+ write_footer();
}
#endif
@@ -7587,7 +7563,7 @@ err:
#ifndef MYSQL_CLIENT
-bool User_var_log_event::write(IO_CACHE* file)
+bool User_var_log_event::write()
{
char buf[UV_NAME_LEN_SIZE];
char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
@@ -7642,13 +7618,13 @@ bool User_var_log_event::write(IO_CACHE* file)
/* Length of the whole event */
event_length= sizeof(buf)+ name_len + buf1_length + val_len + unsigned_len;
- return (write_header(file, event_length) ||
- wrapper_my_b_safe_write(file, (uchar*) buf, sizeof(buf)) ||
- wrapper_my_b_safe_write(file, (uchar*) name, name_len) ||
- wrapper_my_b_safe_write(file, (uchar*) buf1, buf1_length) ||
- wrapper_my_b_safe_write(file, pos, val_len) ||
- wrapper_my_b_safe_write(file, &flags, unsigned_len) ||
- write_footer(file));
+ return write_header(event_length) ||
+ write_data(buf, sizeof(buf)) ||
+ write_data(name, name_len) ||
+ write_data(buf1, buf1_length) ||
+ write_data(pos, val_len) ||
+ write_data(&flags, unsigned_len) ||
+ write_footer();
}
#endif
@@ -7986,13 +7962,13 @@ Create_file_log_event(THD* thd_arg, sql_exchange* ex,
Create_file_log_event::write_data_body()
*/
-bool Create_file_log_event::write_data_body(IO_CACHE* file)
+bool Create_file_log_event::write_data_body()
{
bool res;
- if ((res= Load_log_event::write_data_body(file)) || fake_base)
+ if ((res= Load_log_event::write_data_body()) || fake_base)
return res;
- return (my_b_safe_write(file, (uchar*) "", 1) ||
- my_b_safe_write(file, (uchar*) block, block_len));
+ return write_data("", 1) ||
+ write_data(block, block_len);
}
@@ -8000,14 +7976,14 @@ bool Create_file_log_event::write_data_body(IO_CACHE* file)
Create_file_log_event::write_data_header()
*/
-bool Create_file_log_event::write_data_header(IO_CACHE* file)
+bool Create_file_log_event::write_data_header()
{
bool res;
uchar buf[CREATE_FILE_HEADER_LEN];
- if ((res= Load_log_event::write_data_header(file)) || fake_base)
+ if ((res= Load_log_event::write_data_header()) || fake_base)
return res;
int4store(buf + CF_FILE_ID_OFFSET, file_id);
- return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0;
+ return write_data(buf, CREATE_FILE_HEADER_LEN) != 0;
}
@@ -8015,11 +7991,11 @@ bool Create_file_log_event::write_data_header(IO_CACHE* file)
Create_file_log_event::write_base()
*/
-bool Create_file_log_event::write_base(IO_CACHE* file)
+bool Create_file_log_event::write_base()
{
bool res;
fake_base= 1; // pretend we are Load event
- res= write(file);
+ res= write();
fake_base= 0;
return res;
}
@@ -8166,6 +8142,7 @@ int Create_file_log_event::do_apply_event(rpl_group_info *rgi)
char *ext;
int fd = -1;
IO_CACHE file;
+ Log_event_writer lew(&file);
int error = 1;
Relay_log_info const *rli= rgi->rli;
@@ -8191,7 +8168,8 @@ int Create_file_log_event::do_apply_event(rpl_group_info *rgi)
// a trick to avoid allocating another buffer
fname= fname_buf;
fname_len= (uint) (strmov(ext, ".data") - fname);
- if (write_base(&file))
+ writer= &lew;
+ if (write_base())
{
strmov(ext, ".info"); // to have it right in the error message
rli->report(ERROR_LEVEL, my_errno, rgi->gtid_info(),
@@ -8282,14 +8260,14 @@ Append_block_log_event::Append_block_log_event(const char* buf, uint len,
*/
#ifndef MYSQL_CLIENT
-bool Append_block_log_event::write(IO_CACHE* file)
+bool Append_block_log_event::write()
{
uchar buf[APPEND_BLOCK_HEADER_LEN];
int4store(buf + AB_FILE_ID_OFFSET, file_id);
- return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) ||
- wrapper_my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
- wrapper_my_b_safe_write(file, (uchar*) block, block_len) ||
- write_footer(file));
+ return write_header(APPEND_BLOCK_HEADER_LEN + block_len) ||
+ write_data(buf, APPEND_BLOCK_HEADER_LEN) ||
+ write_data(block, block_len) ||
+ write_footer();
}
#endif
@@ -8442,13 +8420,13 @@ Delete_file_log_event::Delete_file_log_event(const char* buf, uint len,
*/
#ifndef MYSQL_CLIENT
-bool Delete_file_log_event::write(IO_CACHE* file)
+bool Delete_file_log_event::write()
{
uchar buf[DELETE_FILE_HEADER_LEN];
int4store(buf + DF_FILE_ID_OFFSET, file_id);
- return (write_header(file, sizeof(buf)) ||
- wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
- write_footer(file));
+ return write_header(sizeof(buf)) ||
+ write_data(buf, sizeof(buf)) ||
+ write_footer();
}
#endif
@@ -8542,13 +8520,13 @@ Execute_load_log_event::Execute_load_log_event(const char* buf, uint len,
*/
#ifndef MYSQL_CLIENT
-bool Execute_load_log_event::write(IO_CACHE* file)
+bool Execute_load_log_event::write()
{
uchar buf[EXEC_LOAD_HEADER_LEN];
int4store(buf + EL_FILE_ID_OFFSET, file_id);
- return (write_header(file, sizeof(buf)) ||
- wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
- write_footer(file));
+ return write_header(sizeof(buf)) ||
+ write_data(buf, sizeof(buf)) ||
+ write_footer();
}
#endif
@@ -8777,14 +8755,14 @@ ulong Execute_load_query_log_event::get_post_header_size_for_derived()
#ifndef MYSQL_CLIENT
bool
-Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file)
+Execute_load_query_log_event::write_post_header_for_derived()
{
uchar buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN];
int4store(buf, file_id);
int4store(buf + 4, fn_pos_start);
int4store(buf + 4 + 4, fn_pos_end);
*(buf + 4 + 4 + 4)= (uchar) dup_handling;
- return wrapper_my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
+ return write_data(buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
}
#endif
@@ -8928,40 +8906,6 @@ Execute_load_query_log_event::do_apply_event(rpl_group_info *rgi)
**************************************************************************/
/*
- sql_ex_info::write_data()
-*/
-
-bool sql_ex_info::write_data(IO_CACHE* file)
-{
- if (new_format())
- {
- return (write_str(file, field_term, (uint) field_term_len) ||
- write_str(file, enclosed, (uint) enclosed_len) ||
- write_str(file, line_term, (uint) line_term_len) ||
- write_str(file, line_start, (uint) line_start_len) ||
- write_str(file, escaped, (uint) escaped_len) ||
- my_b_safe_write(file,(uchar*) &opt_flags,1));
- }
- else
- {
- /**
- @todo This is sensitive to field padding. We should write a
- char[7], not an old_sql_ex. /sven
- */
- old_sql_ex old_ex;
- old_ex.field_term= *field_term;
- old_ex.enclosed= *enclosed;
- old_ex.line_term= *line_term;
- old_ex.line_start= *line_start;
- old_ex.escaped= *escaped;
- old_ex.opt_flags= opt_flags;
- old_ex.empty_flags=empty_flags;
- return my_b_safe_write(file, (uchar*) &old_ex, sizeof(old_ex)) != 0;
- }
-}
-
-
-/*
sql_ex_info::init()
*/
@@ -9011,12 +8955,54 @@ const char *sql_ex_info::init(const char *buf, const char *buf_end,
return buf;
}
+#ifndef MYSQL_CLIENT
+/*
+ write_str()
+*/
+
+static bool write_str(Log_event_writer *writer, const char *str, uint length)
+{
+ uchar tmp[1];
+ tmp[0]= (uchar) length;
+ return (writer->write_data(tmp, sizeof(tmp)) ||
+ writer->write_data((uchar*) str, length));
+}
+
+/*
+ sql_ex_info::write_data()
+*/
+
+bool sql_ex_info::write_data(Log_event_writer *writer)
+{
+ if (new_format())
+ {
+ return write_str(writer, field_term, field_term_len) ||
+ write_str(writer, enclosed, enclosed_len) ||
+ write_str(writer, line_term, line_term_len) ||
+ write_str(writer, line_start, line_start_len) ||
+ write_str(writer, escaped, escaped_len) ||
+ writer->write_data((uchar*) &opt_flags, 1);
+ }
+ else
+ {
+ uchar old_ex[7];
+ old_ex[0]= *field_term;
+ old_ex[1]= *enclosed;
+ old_ex[2]= *line_term;
+ old_ex[3]= *line_start;
+ old_ex[4]= *escaped;
+ old_ex[5]= opt_flags;
+ old_ex[6]= empty_flags;
+ return writer->write_data(old_ex, sizeof(old_ex));
+ }
+}
+
+
/**************************************************************************
Rows_log_event member functions
**************************************************************************/
-#ifndef MYSQL_CLIENT
Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
MY_BITMAP const *cols, bool is_transactional,
Log_event_type event_type)
@@ -9965,7 +9951,7 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi)
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
#ifndef MYSQL_CLIENT
-bool Rows_log_event::write_data_header(IO_CACHE *file)
+bool Rows_log_event::write_data_header()
{
uchar buf[ROWS_HEADER_LEN_V2]; // No need to init the buffer
DBUG_ASSERT(m_table_id != ~0UL);
@@ -9973,14 +9959,14 @@ bool Rows_log_event::write_data_header(IO_CACHE *file)
{
int4store(buf + 0, m_table_id);
int2store(buf + 4, m_flags);
- return (wrapper_my_b_safe_write(file, buf, 6));
+ return (write_data(buf, 6));
});
int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
int2store(buf + RW_FLAGS_OFFSET, m_flags);
- return (wrapper_my_b_safe_write(file, buf, ROWS_HEADER_LEN));
+ return write_data(buf, ROWS_HEADER_LEN);
}
-bool Rows_log_event::write_data_body(IO_CACHE*file)
+bool Rows_log_event::write_data_body()
{
/*
Note that this should be the number of *bits*, not the number of
@@ -9993,11 +9979,10 @@ bool Rows_log_event::write_data_body(IO_CACHE*file)
DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
- res= res || wrapper_my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
+ res= res || write_data(sbuf, (size_t) (sbuf_end - sbuf));
DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
- res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols.bitmap,
- no_bytes_in_map(&m_cols));
+ res= res || write_data((uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols));
/*
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
*/
@@ -10005,11 +9990,11 @@ bool Rows_log_event::write_data_body(IO_CACHE*file)
{
DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap,
no_bytes_in_map(&m_cols_ai));
- res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols_ai.bitmap,
- no_bytes_in_map(&m_cols_ai));
+ res= res || write_data((uchar*)m_cols_ai.bitmap,
+ no_bytes_in_map(&m_cols_ai));
}
DBUG_DUMP("rows", m_rows_buf, data_size);
- res= res || wrapper_my_b_safe_write(file, m_rows_buf, (size_t) data_size);
+ res= res || write_data(m_rows_buf, (size_t) data_size);
return res;
@@ -10107,16 +10092,16 @@ bool Annotate_rows_log_event::is_valid() const
}
#ifndef MYSQL_CLIENT
-bool Annotate_rows_log_event::write_data_header(IO_CACHE *file)
+bool Annotate_rows_log_event::write_data_header()
{
return 0;
}
#endif
#ifndef MYSQL_CLIENT
-bool Annotate_rows_log_event::write_data_body(IO_CACHE *file)
+bool Annotate_rows_log_event::write_data_body()
{
- return wrapper_my_b_safe_write(file, (uchar*) m_query_txt, m_query_len);
+ return write_data(m_query_txt, m_query_len);
}
#endif
@@ -10826,7 +10811,7 @@ int Table_map_log_event::do_update_pos(rpl_group_info *rgi)
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
#ifndef MYSQL_CLIENT
-bool Table_map_log_event::write_data_header(IO_CACHE *file)
+bool Table_map_log_event::write_data_header()
{
DBUG_ASSERT(m_table_id != ~0UL);
uchar buf[TABLE_MAP_HEADER_LEN];
@@ -10834,14 +10819,14 @@ bool Table_map_log_event::write_data_header(IO_CACHE *file)
{
int4store(buf + 0, m_table_id);
int2store(buf + 4, m_flags);
- return (wrapper_my_b_safe_write(file, buf, 6));
+ return (write_data(buf, 6));
});
int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id);
int2store(buf + TM_FLAGS_OFFSET, m_flags);
- return (wrapper_my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
+ return write_data(buf, TABLE_MAP_HEADER_LEN);
}
-bool Table_map_log_event::write_data_body(IO_CACHE *file)
+bool Table_map_log_event::write_data_body()
{
DBUG_ASSERT(m_dbnam != NULL);
DBUG_ASSERT(m_tblnam != NULL);
@@ -10862,15 +10847,15 @@ bool Table_map_log_event::write_data_body(IO_CACHE *file)
uchar mbuf[MAX_INT_WIDTH];
uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
- return (wrapper_my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
- wrapper_my_b_safe_write(file, (const uchar*)m_dbnam, m_dblen+1) ||
- wrapper_my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
- wrapper_my_b_safe_write(file, (const uchar*)m_tblnam, m_tbllen+1) ||
- wrapper_my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
- wrapper_my_b_safe_write(file, m_coltype, m_colcnt) ||
- wrapper_my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
- wrapper_my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
- wrapper_my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
+ return write_data(dbuf, sizeof(dbuf)) ||
+ write_data(m_dbnam, m_dblen+1) ||
+ write_data(tbuf, sizeof(tbuf)) ||
+ write_data(m_tblnam, m_tbllen+1) ||
+ write_data(cbuf, (size_t) (cbuf_end - cbuf)) ||
+ write_data(m_coltype, m_colcnt) ||
+ write_data(mbuf, (size_t) (mbuf_end - mbuf)) ||
+ write_data(m_field_metadata, m_field_metadata_size),
+ write_data(m_null_bits, (m_colcnt + 7) / 8);
}
#endif
@@ -12432,35 +12417,27 @@ Incident_log_event::do_apply_event(rpl_group_info *rgi)
}
#endif
+#ifdef MYSQL_SERVER
bool
-Incident_log_event::write_data_header(IO_CACHE *file)
+Incident_log_event::write_data_header()
{
DBUG_ENTER("Incident_log_event::write_data_header");
DBUG_PRINT("enter", ("m_incident: %d", m_incident));
uchar buf[sizeof(int16)];
int2store(buf, (int16) m_incident);
-#ifndef MYSQL_CLIENT
- DBUG_RETURN(wrapper_my_b_safe_write(file, buf, sizeof(buf)));
-#else
- DBUG_RETURN(my_b_safe_write(file, buf, sizeof(buf)));
-#endif
+ DBUG_RETURN(write_data(buf, sizeof(buf)));
}
bool
-Incident_log_event::write_data_body(IO_CACHE *file)
+Incident_log_event::write_data_body()
{
uchar tmp[1];
DBUG_ENTER("Incident_log_event::write_data_body");
tmp[0]= (uchar) m_message.length;
- crc= my_checksum(crc, (uchar*) tmp, 1);
- if (m_message.length > 0)
- {
- crc= my_checksum(crc, (uchar*) m_message.str, m_message.length);
- // todo: report a bug on write_str accepts uint but treats it as uchar
- }
- DBUG_RETURN(write_str(file, m_message.str, (uint) m_message.length));
+ DBUG_RETURN(write_data(tmp, sizeof(tmp)) ||
+ write_data(m_message.str, m_message.length));
}
-
+#endif
#ifdef MYSQL_CLIENT
/**
diff --git a/sql/log_event.h b/sql/log_event.h
index 4c2dca14f3b..77b3d1f5216 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -145,64 +145,10 @@ class String;
#define LINE_START_EMPTY 0x8
#define ESCAPED_EMPTY 0x10
-/*****************************************************************************
-
- old_sql_ex struct
-
- ****************************************************************************/
-struct old_sql_ex
-{
- char field_term;
- char enclosed;
- char line_term;
- char line_start;
- char escaped;
- char opt_flags;
- char empty_flags;
-};
-
#define NUM_LOAD_DELIM_STRS 5
/*****************************************************************************
- sql_ex_info struct
-
- ****************************************************************************/
-struct sql_ex_info
-{
- sql_ex_info() {} /* Remove gcc warning */
- const char* field_term;
- const char* enclosed;
- const char* line_term;
- const char* line_start;
- const char* escaped;
- int cached_new_format;
- uint8 field_term_len,enclosed_len,line_term_len,line_start_len, escaped_len;
- char opt_flags;
- char empty_flags;
-
- // store in new format even if old is possible
- void force_new_format() { cached_new_format = 1;}
- int data_size()
- {
- return (new_format() ?
- field_term_len + enclosed_len + line_term_len +
- line_start_len + escaped_len + 6 : 7);
- }
- bool write_data(IO_CACHE* file);
- const char* init(const char* buf, const char* buf_end, bool use_new_format);
- bool new_format()
- {
- return ((cached_new_format != -1) ? cached_new_format :
- (cached_new_format=(field_term_len > 1 ||
- enclosed_len > 1 ||
- line_term_len > 1 || line_start_len > 1 ||
- escaped_len > 1)));
- }
-};
-
-/*****************************************************************************
-
MySQL Binary Log
This log consists of events. Each event has a fixed-length header,
@@ -843,6 +789,33 @@ typedef struct st_print_event_info
#endif
/**
+ This class encapsulates writing of Log_event objects to IO_CACHE.
+ Automatically calculates the checksum if necessary.
+*/
+class Log_event_writer
+{
+public:
+ ulonglong bytes_written;
+ uint checksum_len;
+ int write(Log_event *ev);
+ int write_header(uchar *pos, size_t len);
+ int write_data(const uchar *pos, size_t len);
+ int write_footer();
+ my_off_t pos() { return my_b_safe_tell(file); }
+
+Log_event_writer(IO_CACHE *file_arg) : bytes_written(0), file(file_arg) { }
+
+private:
+ IO_CACHE *file;
+ /**
+ Placeholder for event checksum while writing to binlog.
+ */
+ ha_checksum crc;
+
+ int write_internal(const uchar *pos, size_t len);
+};
+
+/**
the struct aggregates two paramenters that identify an event
uniquely in scope of communication of a particular master and slave couple.
I.e there can not be 2 events from the same staying connected master which
@@ -1108,10 +1081,7 @@ public:
*/
ulong slave_exec_mode;
- /**
- Placeholder for event checksum while writing to binlog.
- */
- ha_checksum crc;
+ Log_event_writer *writer;
#ifdef MYSQL_SERVER
THD* thd;
@@ -1143,6 +1113,7 @@ public:
}
#else
Log_event() : temp_buf(0), flags(0) {}
+ ha_checksum crc;
/* print*() functions are used by mysqlbinlog */
virtual void print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0;
void print_timestamp(IO_CACHE* file, time_t *ts = 0);
@@ -1216,23 +1187,26 @@ public:
/* Placement version of the above operators */
static void *operator new(size_t, void* ptr) { return ptr; }
static void operator delete(void*, void*) { }
- bool wrapper_my_b_safe_write(IO_CACHE* file, const uchar* buf, ulong data_length);
#ifdef MYSQL_SERVER
- bool write_header(IO_CACHE* file, ulong data_length);
- bool write_footer(IO_CACHE* file);
+ bool write_header(ulong data_length);
+ bool write_data(const uchar *buf, ulong data_length)
+ { return writer->write_data(buf, data_length); }
+ bool write_data(const char *buf, ulong data_length)
+ { return write_data((uchar*)buf, data_length); }
+ bool write_footer()
+ { return writer->write_footer(); }
+
my_bool need_checksum();
- virtual bool write(IO_CACHE* file)
+ virtual bool write()
{
- return(write_header(file, get_data_size()) ||
- write_data_header(file) ||
- write_data_body(file) ||
- write_footer(file));
+ return write_header(get_data_size()) || write_data_header() ||
+ write_data_body() || write_footer();
}
- virtual bool write_data_header(IO_CACHE* file __attribute__((unused)))
+ virtual bool write_data_header()
{ return 0; }
- virtual bool write_data_body(IO_CACHE* file __attribute__((unused)))
+ virtual bool write_data_body()
{ return 0; }
/* Return start of query time or current time */
@@ -1989,8 +1963,8 @@ public:
static int dummy_event(String *packet, ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg);
static int begin_event(String *packet, ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg);
#ifdef MYSQL_SERVER
- bool write(IO_CACHE* file);
- virtual bool write_post_header_for_derived(IO_CACHE* file) { return FALSE; }
+ bool write();
+ virtual bool write_post_header_for_derived() { return FALSE; }
#endif
bool is_valid() const { return query != 0; }
@@ -2040,6 +2014,42 @@ public: /* !!! Public in this patch to allow old usage */
};
+/*****************************************************************************
+ sql_ex_info struct
+ ****************************************************************************/
+struct sql_ex_info
+{
+ sql_ex_info() {} /* Remove gcc warning */
+ const char* field_term;
+ const char* enclosed;
+ const char* line_term;
+ const char* line_start;
+ const char* escaped;
+ int cached_new_format;
+ uint8 field_term_len,enclosed_len,line_term_len,line_start_len, escaped_len;
+ char opt_flags;
+ char empty_flags;
+
+ // store in new format even if old is possible
+ void force_new_format() { cached_new_format = 1;}
+ int data_size()
+ {
+ return (new_format() ?
+ field_term_len + enclosed_len + line_term_len +
+ line_start_len + escaped_len + 6 : 7);
+ }
+ bool write_data(Log_event_writer *writer);
+ const char* init(const char* buf, const char* buf_end, bool use_new_format);
+ bool new_format()
+ {
+ return ((cached_new_format != -1) ? cached_new_format :
+ (cached_new_format=(field_term_len > 1 ||
+ enclosed_len > 1 ||
+ line_term_len > 1 || line_start_len > 1 ||
+ escaped_len > 1)));
+ }
+};
+
/**
@class Load_log_event
@@ -2333,8 +2343,8 @@ public:
return sql_ex.new_format() ? NEW_LOAD_EVENT: LOAD_EVENT;
}
#ifdef MYSQL_SERVER
- bool write_data_header(IO_CACHE* file);
- bool write_data_body(IO_CACHE* file);
+ bool write_data_header();
+ bool write_data_body();
#endif
bool is_valid() const { return table_name != 0; }
int get_data_size()
@@ -2422,7 +2432,7 @@ public:
my_off_t get_header_len(my_off_t l __attribute__((unused)))
{ return LOG_EVENT_MINIMAL_HEADER_LEN; }
#ifdef MYSQL_SERVER
- bool write(IO_CACHE* file);
+ bool write();
#endif
bool is_valid() const { return server_version[0] != 0; }
int get_data_size()
@@ -2492,7 +2502,7 @@ public:
}
Log_event_type get_type_code() { return FORMAT_DESCRIPTION_EVENT;}
#ifdef MYSQL_SERVER
- bool write(IO_CACHE* file);
+ bool write();
#endif
bool header_is_valid() const
{
@@ -2601,7 +2611,7 @@ Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg,
const char* get_var_type_name();
int get_data_size() { return 9; /* sizeof(type) + sizeof(val) */;}
#ifdef MYSQL_SERVER
- bool write(IO_CACHE* file);
+ bool write();
#endif
bool is_valid() const { return 1; }
bool is_part_of_group() { return 1; }
@@ -2681,7 +2691,7 @@ class Rand_log_event: public Log_event
Log_event_type get_type_code() { return RAND_EVENT;}
int get_data_size() { return 16; /* sizeof(ulonglong) * 2*/ }
#ifdef MYSQL_SERVER
- bool write(IO_CACHE* file);
+ bool write();
#endif
bool is_valid() const { return 1; }
bool is_part_of_group() { return 1; }
@@ -2731,7 +2741,7 @@ class Xid_log_event: public Log_event
Log_event_type get_type_code() { return XID_EVENT;}
int get_data_size() { return sizeof(xid); }
#ifdef MYSQL_SERVER
- bool write(IO_CACHE* file);
+ bool write();
#endif
bool is_valid() const { return 1; }
@@ -2792,7 +2802,7 @@ public:
~User_var_log_event() {}
Log_event_type get_type_code() { return USER_VAR_EVENT;}
#ifdef MYSQL_SERVER
- bool write(IO_CACHE* file);
+ bool write();
/*
Getter and setter for deferred User-event.
Returns true if the event is not applied directly
@@ -2944,7 +2954,7 @@ public:
int get_data_size() { return ident_len + ROTATE_HEADER_LEN;}
bool is_valid() const { return new_log_ident != 0; }
#ifdef MYSQL_SERVER
- bool write(IO_CACHE* file);
+ bool write();
#endif
private:
@@ -2977,7 +2987,7 @@ public:
int get_data_size() { return binlog_file_len + BINLOG_CHECKPOINT_HEADER_LEN;}
bool is_valid() const { return binlog_file_name != 0; }
#ifdef MYSQL_SERVER
- bool write(IO_CACHE* file);
+ bool write();
enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@@ -3105,7 +3115,7 @@ public:
}
bool is_valid() const { return seq_no != 0; }
#ifdef MYSQL_SERVER
- bool write(IO_CACHE *file);
+ bool write();
static int make_compatible_event(String *packet, bool *need_dummy_event,
ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg);
static bool peek(const char *event_start, size_t event_len,
@@ -3220,7 +3230,7 @@ public:
bool is_valid() const { return list != NULL; }
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
bool to_packet(String *packet);
- bool write(IO_CACHE *file);
+ bool write();
virtual int do_apply_event(rpl_group_info *rgi);
enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
@@ -3291,13 +3301,13 @@ public:
}
bool is_valid() const { return inited_from_old || block != 0; }
#ifdef MYSQL_SERVER
- bool write_data_header(IO_CACHE* file);
- bool write_data_body(IO_CACHE* file);
+ bool write_data_header();
+ bool write_data_body();
/*
Cut out Create_file extentions and
write it as Load event - used on the slave
*/
- bool write_base(IO_CACHE* file);
+ bool write_base();
#endif
private:
@@ -3351,7 +3361,7 @@ public:
int get_data_size() { return block_len + APPEND_BLOCK_HEADER_LEN ;}
bool is_valid() const { return block != 0; }
#ifdef MYSQL_SERVER
- bool write(IO_CACHE* file);
+ bool write();
const char* get_db() { return db; }
#endif
@@ -3392,7 +3402,7 @@ public:
int get_data_size() { return DELETE_FILE_HEADER_LEN ;}
bool is_valid() const { return file_id != 0; }
#ifdef MYSQL_SERVER
- bool write(IO_CACHE* file);
+ bool write();
const char* get_db() { return db; }
#endif
@@ -3432,7 +3442,7 @@ public:
int get_data_size() { return EXEC_LOAD_HEADER_LEN ;}
bool is_valid() const { return file_id != 0; }
#ifdef MYSQL_SERVER
- bool write(IO_CACHE* file);
+ bool write();
const char* get_db() { return db; }
#endif
@@ -3532,7 +3542,7 @@ public:
ulong get_post_header_size_for_derived();
#ifdef MYSQL_SERVER
- bool write_post_header_for_derived(IO_CACHE* file);
+ bool write_post_header_for_derived();
#endif
private:
@@ -3596,8 +3606,8 @@ public:
virtual bool is_part_of_group() { return 1; }
#ifndef MYSQL_CLIENT
- virtual bool write_data_header(IO_CACHE*);
- virtual bool write_data_body(IO_CACHE*);
+ virtual bool write_data_header();
+ virtual bool write_data_body();
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
@@ -4012,8 +4022,8 @@ public:
virtual int get_data_size() { return (uint) m_data_size; }
#ifdef MYSQL_SERVER
virtual int save_field_metadata();
- virtual bool write_data_header(IO_CACHE *file);
- virtual bool write_data_body(IO_CACHE *file);
+ virtual bool write_data_header();
+ virtual bool write_data_body();
virtual const char *get_db() { return m_dbnam; }
#endif
@@ -4211,8 +4221,8 @@ public:
#endif
#ifdef MYSQL_SERVER
- virtual bool write_data_header(IO_CACHE *file);
- virtual bool write_data_body(IO_CACHE *file);
+ virtual bool write_data_header();
+ virtual bool write_data_body();
virtual const char *get_db() { return m_table->s->db.str; }
#endif
/*
@@ -4674,6 +4684,9 @@ public:
#ifdef MYSQL_SERVER
void pack_info(THD *thd, Protocol*);
+
+ virtual bool write_data_header();
+ virtual bool write_data_body();
#endif
Incident_log_event(const char *buf, uint event_len,
@@ -4689,9 +4702,6 @@ public:
virtual int do_apply_event(rpl_group_info *rgi);
#endif
- virtual bool write_data_header(IO_CACHE *file);
- virtual bool write_data_body(IO_CACHE *file);
-
virtual Log_event_type get_type_code() { return INCIDENT_EVENT; }
virtual bool is_valid() const
@@ -4752,6 +4762,14 @@ private:
uint ident_len;
};
+inline int Log_event_writer::write(Log_event *ev)
+{
+ ev->writer= this;
+ int res= ev->write();
+ IF_DBUG(ev->writer= 0,); // writer must be set before every Log_event::write
+ return res;
+}
+
/**
The function is called by slave applier in case there are
active table filtering rules to force gathering events associated
diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc
index 4779e90d815..ce1bf614bc9 100644
--- a/sql/log_event_old.cc
+++ b/sql/log_event_old.cc
@@ -1753,7 +1753,7 @@ Old_rows_log_event::do_update_pos(rpl_group_info *rgi)
#ifndef MYSQL_CLIENT
-bool Old_rows_log_event::write_data_header(IO_CACHE *file)
+bool Old_rows_log_event::write_data_header()
{
uchar buf[ROWS_HEADER_LEN]; // No need to init the buffer
@@ -1765,15 +1765,15 @@ bool Old_rows_log_event::write_data_header(IO_CACHE *file)
{
int4store(buf + 0, m_table_id);
int2store(buf + 4, m_flags);
- return (my_b_safe_write(file, buf, 6));
+ return write_data(buf, 6);
});
int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
int2store(buf + RW_FLAGS_OFFSET, m_flags);
- return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
+ return write_data(buf, ROWS_HEADER_LEN);
}
-bool Old_rows_log_event::write_data_body(IO_CACHE*file)
+bool Old_rows_log_event::write_data_body()
{
/*
Note that this should be the number of *bits*, not the number of
@@ -1790,13 +1790,12 @@ bool Old_rows_log_event::write_data_body(IO_CACHE*file)
DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
- res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
+ res= res || write_data(sbuf, (size_t) (sbuf_end - sbuf));
DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
- res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap,
- no_bytes_in_map(&m_cols));
+ res= res || write_data((uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols));
DBUG_DUMP("rows", m_rows_buf, data_size);
- res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
+ res= res || write_data(m_rows_buf, (size_t) data_size);
return res;
diff --git a/sql/log_event_old.h b/sql/log_event_old.h
index ed07f753e7a..edc74ca1a6f 100644
--- a/sql/log_event_old.h
+++ b/sql/log_event_old.h
@@ -134,8 +134,8 @@ public:
ulong get_table_id() const { return m_table_id; }
#ifndef MYSQL_CLIENT
- virtual bool write_data_header(IO_CACHE *file);
- virtual bool write_data_body(IO_CACHE *file);
+ virtual bool write_data_header();
+ virtual bool write_data_body();
virtual const char *get_db() { return m_table->s->db.str; }
#endif
/*
diff --git a/sql/slave.cc b/sql/slave.cc
index de6bf95b50e..13acc8c62ba 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -5501,7 +5501,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mi->master_log_name, rev.new_log_ident);
mysql_mutex_lock(log_lock);
- if (likely(!fdle.write(rli->relay_log.get_log_file()) &&
+ if (likely(!rli->relay_log.write_event(&fdle) &&
!rli->relay_log.flush_and_sync(NULL)))
{
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
diff --git a/sql/wsrep_binlog.cc b/sql/wsrep_binlog.cc
index 0bc04ebb066..36917674128 100644
--- a/sql/wsrep_binlog.cc
+++ b/sql/wsrep_binlog.cc
@@ -445,6 +445,7 @@ void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
char filename[PATH_MAX]= {0};
File file;
IO_CACHE cache;
+ Log_event_writer writer(&cache);
Format_description_log_event *ev= wsrep_get_apply_format(thd);
int len= my_snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld_v2.log",
@@ -476,7 +477,7 @@ void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
goto cleanup2;
}
- if (ev->write(&cache) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) ||
+ if (writer.write(ev) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) ||
flush_io_cache(&cache))
{
WSREP_ERROR("Failed to write to '%s'.", filename);
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index 0a44dd278ac..a785b8764cf 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -1215,6 +1215,7 @@ int wsrep_to_buf_helper(
THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len)
{
IO_CACHE tmp_io_cache;
+ Log_event_writer writer(&tmp_io_cache);
if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
65536, MYF(MY_WME)))
return 1;
@@ -1222,7 +1223,7 @@ int wsrep_to_buf_helper(
Format_description_log_event *tmp_fd= new Format_description_log_event(4);
tmp_fd->checksum_alg= (enum_binlog_checksum_alg)binlog_checksum_options;
- tmp_fd->write(&tmp_io_cache);
+ writer.write(tmp_fd);
delete tmp_fd;
#ifdef GTID_SUPPORT
@@ -1230,7 +1231,7 @@ int wsrep_to_buf_helper(
{
Gtid_log_event gtid_ev(thd, FALSE, &thd->variables.gtid_next);
if (!gtid_ev.is_valid()) ret= 0;
- if (!ret && gtid_ev.write(&tmp_io_cache)) ret= 1;
+ if (!ret && writer.write(&gtid_ev)) ret= 1;
}
#endif /* GTID_SUPPORT */
@@ -1240,12 +1241,12 @@ int wsrep_to_buf_helper(
Query_log_event ev(thd, thd->wsrep_TOI_pre_query,
thd->wsrep_TOI_pre_query_len,
FALSE, FALSE, FALSE, 0);
- if (ev.write(&tmp_io_cache)) ret= 1;
+ if (writer.write(&ev)) ret= 1;
}
/* continue to append the actual query */
Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0);
- if (!ret && ev.write(&tmp_io_cache)) ret= 1;
+ if (!ret && writer.write(&ev)) ret= 1;
if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1;
close_cached_file(&tmp_io_cache);
return ret;