summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc1914
1 files changed, 1860 insertions, 54 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index c8f8ff40700..6e256a0c295 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -21,11 +21,14 @@
#pragma implementation // gcc: Class implementation
#endif
-#include "mysql_priv.h"
+#include "mysql_priv.h"
#include "slave.h"
#include "rpl_filter.h"
#include <my_dir.h>
#endif /* MYSQL_CLIENT */
+#include <base64.h>
+#include <my_bitmap.h>
+#include <my_vle.h>
#define log_cs &my_charset_latin1
@@ -232,6 +235,7 @@ char *str_to_hex(char *to, const char *from, uint len)
commands just before it prints a query.
*/
+#ifdef MYSQL_CLIENT
static void print_set_option(FILE* file, uint32 bits_changed, uint32 option,
uint32 flags, const char* name, bool* need_comma)
{
@@ -243,6 +247,7 @@ static void print_set_option(FILE* file, uint32 bits_changed, uint32 option,
*need_comma= 1;
}
}
+#endif
/**************************************************************************
Log_event methods (= the parent class of all events)
@@ -271,6 +276,10 @@ const char* Log_event::get_type_str()
case XID_EVENT: return "Xid";
case USER_VAR_EVENT: return "User var";
case FORMAT_DESCRIPTION_EVENT: return "Format_desc";
+ case TABLE_MAP_EVENT: return "Table_map";
+ case WRITE_ROWS_EVENT: return "Write_rows";
+ case UPDATE_ROWS_EVENT: return "Update_rows";
+ case DELETE_ROWS_EVENT: return "Delete_rows";
case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
default: return "Unknown"; /* impossible */
@@ -778,6 +787,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
DBUG_RETURN(NULL); // general sanity check - will fail on a partial read
}
+ /* To check the integrity of the Log_event_type enumeration */
+ DBUG_ASSERT(buf[EVENT_TYPE_OFFSET] < ENUM_END_EVENT);
+
switch(buf[EVENT_TYPE_OFFSET]) {
case QUERY_EVENT:
ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
@@ -829,6 +841,20 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
case FORMAT_DESCRIPTION_EVENT:
ev = new Format_description_log_event(buf, event_len, description_event);
break;
+#if defined(HAVE_REPLICATION) && defined(HAVE_ROW_BASED_REPLICATION)
+ case WRITE_ROWS_EVENT:
+ ev = new Write_rows_log_event(buf, event_len, description_event);
+ break;
+ case UPDATE_ROWS_EVENT:
+ ev = new Update_rows_log_event(buf, event_len, description_event);
+ break;
+ case DELETE_ROWS_EVENT:
+ ev = new Delete_rows_log_event(buf, event_len, description_event);
+ break;
+ case TABLE_MAP_EVENT:
+ ev = new Table_map_log_event(buf, event_len, description_event);
+ break;
+#endif
case BEGIN_LOAD_QUERY_EVENT:
ev = new Begin_load_query_log_event(buf, event_len, description_event);
break;
@@ -952,6 +978,24 @@ void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info)
}
+void Log_event::print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info)
+{
+ uchar *ptr= (uchar*)temp_buf;
+ my_off_t size= uint4korr(ptr + EVENT_LEN_OFFSET);
+
+ char *tmp_str=
+ (char *) my_malloc(base64_needed_encoded_length(size), MYF(MY_WME));
+ if (!tmp_str) {
+ fprintf(stderr, "\nError: Out of memory. "
+ "Could not print correct binlog event.\n");
+ return;
+ }
+ int res= base64_encode(ptr, size, tmp_str);
+ fprintf(file, "\nBINLOG '\n%s\n';\n", tmp_str);
+ my_free(tmp_str, MYF(0));
+}
+
+
/*
Log_event::print_timestamp()
*/
@@ -1714,7 +1758,7 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli, const char *query
clear_all_errors(thd, rli); /* Can ignore query */
else
{
- slave_print_error(rli,expected_error,
+ slave_print_msg(ERROR_LEVEL, rli, expected_error,
"\
Query partially completed on the master (error on master: %d) \
and was aborted. There is a chance that your master is inconsistent at this \
@@ -1743,16 +1787,16 @@ compare_errors:
!ignored_error_code(actual_error) &&
!ignored_error_code(expected_error))
{
- slave_print_error(rli, 0,
- "\
-Query caused different errors on master and slave. \
+ slave_print_msg(ERROR_LEVEL, rli, 0,
+ "\
+Query caused different errors on master and slave. \
Error on master: '%s' (%d), Error on slave: '%s' (%d). \
Default database: '%s'. Query: '%s'",
- ER_SAFE(expected_error),
- expected_error,
- actual_error ? thd->net.last_error: "no error",
- actual_error,
- print_slave_db_safe(db), query_arg);
+ ER_SAFE(expected_error),
+ expected_error,
+ actual_error ? thd->net.last_error: "no error",
+ actual_error,
+ print_slave_db_safe(db), query_arg);
thd->query_error= 1;
}
/*
@@ -1769,11 +1813,11 @@ Default database: '%s'. Query: '%s'",
*/
else if (thd->query_error || thd->is_fatal_error)
{
- slave_print_error(rli,actual_error,
- "Error '%s' on query. Default database: '%s'. Query: '%s'",
- (actual_error ? thd->net.last_error :
- "unexpected success or fatal error"),
- print_slave_db_safe(thd->db), query_arg);
+ slave_print_msg(ERROR_LEVEL, rli, actual_error,
+ "Error '%s' on query. Default database: '%s'. Query: '%s'",
+ (actual_error ? thd->net.last_error :
+ "unexpected success or fatal error"),
+ print_slave_db_safe(thd->db), query_arg);
thd->query_error= 1;
}
@@ -2055,6 +2099,25 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN;
+ post_header_len[TABLE_MAP_EVENT-1]= TABLE_MAP_HEADER_LEN;
+ post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
+ post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
+ post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN;
+ /*
+ We here have the possibility to simulate a master of before we changed
+ the table map id to be stored in 6 bytes: when it was stored in 4
+ bytes (=> post_header_len was 6). This is used to test backward
+ compatibility.
+ This code can be removed after a few months (today is Dec 21st 2005),
+ when we know that the 4-byte masters are not deployed anymore (check
+ with Tomas Ulin first!), and the accompanying test (rpl_row_4_bytes)
+ too.
+ */
+ DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
+ post_header_len[TABLE_MAP_EVENT-1]=
+ post_header_len[WRITE_ROWS_EVENT-1]=
+ post_header_len[UPDATE_ROWS_EVENT-1]=
+ post_header_len[DELETE_ROWS_EVENT-1]= 6;);
post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= post_header_len[APPEND_BLOCK_EVENT-1];
post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN;
}
@@ -2189,10 +2252,8 @@ int Format_description_log_event::exec_event(struct st_relay_log_info* rli)
As a transaction NEVER spans on 2 or more binlogs:
if we have an active transaction at this point, the master died
while writing the transaction to the binary log, i.e. while
- flushing the binlog cache to the binlog. As the write was started,
- the transaction had been committed on the master, so we lack of
- information to replay this transaction on the slave; all we can do
- is stop with error.
+ flushing the binlog cache to the binlog. XA guarantees that master has
+ rolled back. So we roll back.
Note: this event could be sent by the master to inform us of the
format of its binlog; in other words maybe it is not at its
original place when it comes to us; we'll know this by checking
@@ -2200,11 +2261,13 @@ int Format_description_log_event::exec_event(struct st_relay_log_info* rli)
*/
if (!artificial_event && created && thd->transaction.all.nht)
{
- slave_print_error(rli, 0, "Rolling back unfinished transaction (no "
- "COMMIT or ROLLBACK) from relay log. A probable cause "
- "is that the master died while writing the transaction "
- "to its binary log.");
- end_trans(thd, ROLLBACK);
+ /* This is not an error (XA is safe), just an information */
+ slave_print_msg(INFORMATION_LEVEL, rli, 0,
+ "Rolling back unfinished transaction (no COMMIT "
+ "or ROLLBACK in relay log). A probable cause is that "
+ "the master died while writing the transaction to "
+ "its binary log, thus rolled back too.");
+ rli->cleanup_context(thd, 1);
}
#endif
/*
@@ -2751,6 +2814,9 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli,
thd->query_length= 0; // Should not be needed
thd->query_error= 0;
clear_all_errors(thd, rli);
+
+ /* see Query_log_event::exec_event() and BUG#13360 */
+ DBUG_ASSERT(!rli->m_table_map.count());
/*
Usually mysql_init_query() is called by mysql_parse(), but we need it here
as the present method does not call mysql_parse().
@@ -2962,9 +3028,9 @@ error:
sql_errno=ER_UNKNOWN_ERROR;
err=ER(sql_errno);
}
- slave_print_error(rli,sql_errno,"\
+ slave_print_msg(ERROR_LEVEL, rli, sql_errno,"\
Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'",
- err, (char*)table_name, print_slave_db_safe(save_db));
+ err, (char*)table_name, print_slave_db_safe(save_db));
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
return 1;
}
@@ -2972,9 +3038,9 @@ Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'",
if (thd->is_fatal_error)
{
- slave_print_error(rli,ER_UNKNOWN_ERROR, "\
+ slave_print_msg(ERROR_LEVEL, rli, ER_UNKNOWN_ERROR, "\
Fatal error running LOAD DATA INFILE on table '%s'. Default database: '%s'",
- (char*)table_name, print_slave_db_safe(save_db));
+ (char*)table_name, print_slave_db_safe(save_db));
return 1;
}
@@ -3035,8 +3101,7 @@ void Rotate_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
#ifndef MYSQL_CLIENT
-Rotate_log_event::Rotate_log_event(THD* thd_arg,
- const char* new_log_ident_arg,
+Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
uint ident_len_arg, ulonglong pos_arg,
uint flags_arg)
:Log_event(), new_log_ident(new_log_ident_arg),
@@ -3045,7 +3110,7 @@ Rotate_log_event::Rotate_log_event(THD* thd_arg,
{
#ifndef DBUG_OFF
char buff[22];
- DBUG_ENTER("Rotate_log_event::Rotate_log_event(THD*,...)");
+ DBUG_ENTER("Rotate_log_event::Rotate_log_event(...,flags)");
DBUG_PRINT("enter",("new_log_ident %s pos %s flags %lu", new_log_ident_arg,
llstr(pos_arg, buff), flags));
#endif
@@ -3353,12 +3418,24 @@ int Rand_log_event::exec_event(struct st_relay_log_info* rli)
Xid_log_event methods
**************************************************************************/
+#if !defined(DBUG_OFF) && !defined(MYSQL_CLIENT)
+/*
+ This static class member could be removed when mysqltest is made to support
+ a --replace-regex command: then tests which have XIDs in their output can
+ use this command to suppress non-deterministic XID values.
+*/
+my_bool Xid_log_event::show_xid;
+#endif
+
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
void Xid_log_event::pack_info(Protocol *protocol)
{
char buf[128], *pos;
pos= strmov(buf, "COMMIT /* xid=");
- pos= longlong10_to_str(xid, pos, 10);
+#if !defined(DBUG_OFF) && !defined(MYSQL_CLIENT)
+ if (show_xid)
+#endif
+ pos= longlong10_to_str(xid, pos, 10);
pos= strmov(pos, " */");
protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
}
@@ -4179,7 +4256,8 @@ int Create_file_log_event::exec_event(struct st_relay_log_info* rli)
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
- slave_print_error(rli,my_errno, "Error in Create_file event: could not open file '%s'", fname_buf);
+ slave_print_msg(ERROR_LEVEL, rli, my_errno, "Error in Create_file event: "
+ "could not open file '%s'", fname_buf);
goto err;
}
@@ -4190,9 +4268,9 @@ int Create_file_log_event::exec_event(struct st_relay_log_info* rli)
if (write_base(&file))
{
strmov(p, ".info"); // to have it right in the error message
- slave_print_error(rli,my_errno,
- "Error in Create_file event: could not write to file '%s'",
- fname_buf);
+ slave_print_msg(ERROR_LEVEL, rli, my_errno,
+ "Error in Create_file event: could not write to file '%s'",
+ fname_buf);
goto err;
}
end_io_cache(&file);
@@ -4204,12 +4282,14 @@ int Create_file_log_event::exec_event(struct st_relay_log_info* rli)
O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
MYF(MY_WME))) < 0)
{
- slave_print_error(rli,my_errno, "Error in Create_file event: could not open file '%s'", fname_buf);
+ slave_print_msg(ERROR_LEVEL, rli, my_errno, "Error in Create_file event: "
+ "could not open file '%s'", fname_buf);
goto err;
}
if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
{
- slave_print_error(rli,my_errno, "Error in Create_file event: write to '%s' failed", fname_buf);
+ slave_print_msg(ERROR_LEVEL, rli, my_errno, "Error in Create_file event: "
+ "write to '%s' failed", fname_buf);
goto err;
}
error=0; // Everything is ok
@@ -4348,25 +4428,25 @@ int Append_block_log_event::exec_event(struct st_relay_log_info* rli)
O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
MYF(MY_WME))) < 0)
{
- slave_print_error(rli, my_errno,
- "Error in %s event: could not create file '%s'",
- get_type_str(), fname);
+ slave_print_msg(ERROR_LEVEL, rli, my_errno,
+ "Error in %s event: could not create file '%s'",
+ get_type_str(), fname);
goto err;
}
}
else if ((fd = my_open(fname, O_WRONLY | O_APPEND | O_BINARY | O_NOFOLLOW,
MYF(MY_WME))) < 0)
{
- slave_print_error(rli, my_errno,
- "Error in %s event: could not open file '%s'",
- get_type_str(), fname);
+ slave_print_msg(ERROR_LEVEL, rli, my_errno,
+ "Error in %s event: could not open file '%s'",
+ get_type_str(), fname);
goto err;
}
if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
{
- slave_print_error(rli, my_errno,
- "Error in %s event: write to '%s' failed",
- get_type_str(), fname);
+ slave_print_msg(ERROR_LEVEL, rli, my_errno,
+ "Error in %s event: write to '%s' failed",
+ get_type_str(), fname);
goto err;
}
error=0;
@@ -4573,7 +4653,8 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
- slave_print_error(rli,my_errno, "Error in Exec_load event: could not open file '%s'", fname);
+ slave_print_msg(ERROR_LEVEL, rli, my_errno, "Error in Exec_load event: "
+ "could not open file '%s'", fname);
goto err;
}
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
@@ -4581,7 +4662,8 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
rli->relay_log.description_event_for_exec)) ||
lev->get_type_code() != NEW_LOAD_EVENT)
{
- slave_print_error(rli,0, "Error in Exec_load event: file '%s' appears corrupted", fname);
+ slave_print_msg(ERROR_LEVEL, rli, 0, "Error in Exec_load event: "
+ "file '%s' appears corrupted", fname);
goto err;
}
@@ -4607,10 +4689,10 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
char *tmp= my_strdup(rli->last_slave_error,MYF(MY_WME));
if (tmp)
{
- slave_print_error(rli,
- rli->last_slave_errno, /* ok to re-use error code */
- "%s. Failed executing load from '%s'",
- tmp, fname);
+ slave_print_msg(ERROR_LEVEL, rli,
+ rli->last_slave_errno, /* ok to re-use error code */
+ "%s. Failed executing load from '%s'",
+ tmp, fname);
my_free(tmp,MYF(0));
}
goto err;
@@ -4816,7 +4898,7 @@ Execute_load_query_log_event::exec_event(struct st_relay_log_info* rli)
if (!(buf = my_malloc(q_len + 1 - (fn_pos_end - fn_pos_start) +
(FN_REFLEN + 10) + 10 + 8 + 5, MYF(MY_WME))))
{
- slave_print_error(rli, my_errno, "Not enough memory");
+ slave_print_msg(ERROR_LEVEL, rli, my_errno, "Not enough memory");
return 1;
}
@@ -4942,3 +5024,1727 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
}
return buf;
}
+
+
+#ifdef HAVE_ROW_BASED_REPLICATION
+
+/**************************************************************************
+ Rows_log_event member functions
+**************************************************************************/
+
+#ifndef MYSQL_CLIENT
+Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
+ MY_BITMAP const *cols, bool is_transactional)
+ : Log_event(thd_arg, 0, is_transactional),
+ m_table(tbl_arg),
+ m_table_id(tid),
+ m_width(tbl_arg->s->fields),
+ m_rows_buf(my_malloc(opt_binlog_rows_event_max_size * sizeof(*m_rows_buf), MYF(MY_WME))),
+ m_rows_cur(m_rows_buf),
+ m_rows_end(m_rows_buf + opt_binlog_rows_event_max_size),
+ m_flags(0)
+{
+ DBUG_ASSERT(m_table && m_table->s);
+ DBUG_ASSERT(m_table_id != ULONG_MAX);
+
+ if (thd_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS)
+ set_flags(NO_FOREIGN_KEY_CHECKS_F);
+ if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
+ set_flags(RELAXED_UNIQUE_CHECKS_F);
+ /* if bitmap_init fails, catched in is_valid() */
+ if (likely(!bitmap_init(&m_cols,
+ m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
+ (m_width + 7) & ~7UL,
+ false)))
+ memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols));
+ else
+ m_cols.bitmap= 0; // to not free it
+}
+#endif
+
+Rows_log_event::Rows_log_event(const char *buf, uint event_len,
+ Log_event_type event_type,
+ const Format_description_log_event
+ *description_event)
+ : Log_event(buf, description_event),
+ m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
+{
+ DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)");
+ uint8 const common_header_len= description_event->common_header_len;
+ uint8 const post_header_len= description_event->post_header_len[event_type-1];
+
+ DBUG_PRINT("enter",("event_len=%ld, common_header_len=%d, "
+ "post_header_len=%d",
+ event_len, common_header_len,
+ post_header_len));
+
+ const char *post_start= buf + common_header_len;
+ post_start+= RW_MAPID_OFFSET;
+ if (post_header_len == 6)
+ {
+ /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
+ m_table_id= uint4korr(post_start);
+ post_start+= 4;
+ }
+ else
+ {
+ m_table_id= uint6korr(post_start);
+ post_start+= RW_FLAGS_OFFSET;
+ }
+
+ DBUG_ASSERT(m_table_id != ULONG_MAX);
+
+ m_flags= uint2korr(post_start);
+
+ byte const *const var_start= buf + common_header_len + post_header_len;
+ byte const *const ptr_width= var_start;
+ byte const *const ptr_after_width= my_vle_decode(&m_width, ptr_width);
+
+ const uint byte_count= (m_width + 7) / 8;
+ const char* const ptr_rows_data= var_start + byte_count + 1;
+
+ my_size_t const data_size= event_len - (ptr_rows_data - buf);
+ DBUG_PRINT("info",("m_table_id=%lu, m_flags=%d, m_width=%u, data_size=%lu",
+ m_table_id, m_flags, m_width, data_size));
+
+ m_rows_buf= my_malloc(data_size, MYF(MY_WME));
+ if (likely((bool)m_rows_buf))
+ {
+ /* if bitmap_init fails, catched in is_valid() */
+ if (likely(!bitmap_init(&m_cols,
+ m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
+ (m_width + 7) & ~7UL,
+ false)))
+ memcpy(m_cols.bitmap, ptr_after_width, byte_count);
+ m_rows_end= m_rows_buf + data_size;
+ m_rows_cur= m_rows_end;
+ memcpy(m_rows_buf, ptr_rows_data, data_size);
+ }
+ else
+ m_cols.bitmap= 0; // to not free it
+
+ DBUG_VOID_RETURN;
+}
+
+Rows_log_event::~Rows_log_event()
+{
+ if (m_cols.bitmap == m_bitbuf) // no my_malloc happened
+ m_cols.bitmap= 0; // so no my_free in bitmap_free
+ bitmap_free(&m_cols); // To pair with bitmap_init().
+ my_free(m_rows_buf, MYF(MY_ALLOW_ZERO_PTR));
+}
+
+#ifndef MYSQL_CLIENT
+int Rows_log_event::do_add_row_data(byte *const row_data,
+ my_size_t const length)
+{
+ /*
+ When the table has a primary key, we would probably want, by default, to
+ log only the primary key value instead of the entire "before image". This
+ would save binlog space. TODO
+ */
+ DBUG_ENTER("Rows_log_event::do_add_row_data(byte *data, my_size_t length)");
+ DBUG_PRINT("enter", ("row_data= %p, length= %lu", row_data, length));
+ DBUG_DUMP("row_data", row_data, min(length, 32));
+
+ DBUG_ASSERT(m_rows_buf <= m_rows_cur);
+ DBUG_ASSERT(m_rows_buf < m_rows_end);
+ DBUG_ASSERT(m_rows_cur <= m_rows_end);
+
+ /* The cast will always work since m_rows_cur <= m_rows_end */
+ if (static_cast<my_size_t>(m_rows_end - m_rows_cur) < length)
+ {
+ my_size_t const block_size= 1024;
+ my_ptrdiff_t const old_alloc= m_rows_end - m_rows_buf;
+ my_ptrdiff_t const new_alloc=
+ old_alloc + block_size * (length / block_size + block_size - 1);
+ my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf;
+
+ byte* const new_buf= my_realloc(m_rows_buf, new_alloc, MYF(MY_WME));
+ if (unlikely(!new_buf))
+ DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+
+ /* If the memory moved, we need to move the pointers */
+ if (new_buf != m_rows_buf)
+ {
+ m_rows_buf= new_buf;
+ m_rows_cur= m_rows_buf + cur_size;
+ }
+
+ /*
+ The end pointer should always be changed to point to the end of
+ the allocated memory.
+ */
+ m_rows_end= m_rows_buf + new_alloc;
+ }
+
+ DBUG_ASSERT(m_rows_cur + length < m_rows_end);
+ memcpy(m_rows_cur, row_data, length);
+ m_rows_cur+= length;
+ DBUG_RETURN(0);
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+/*
+ Unpack a row into a record. The row is assumed to only consist of the fields
+ for which the bitset represented by 'arr' and 'bits'; the other parts of the
+ record are left alone.
+ */
+static char const *unpack_row(TABLE *table,
+ char *record, char const *row,
+ MY_BITMAP const *cols)
+{
+ DBUG_ASSERT(record && row);
+
+ MY_BITMAP *write_set= table->file->write_set;
+ my_size_t const n_null_bytes= table->s->null_bytes;
+ my_ptrdiff_t const offset= record - (byte*) table->record[0];
+
+ memcpy(record, row, n_null_bytes);
+ char const *ptr= row + n_null_bytes;
+
+ bitmap_set_all(write_set);
+ Field **const begin_ptr = table->field;
+ for (Field **field_ptr= begin_ptr ; *field_ptr ; ++field_ptr)
+ {
+ Field *const f= *field_ptr;
+
+ if (bitmap_is_set(cols, field_ptr - begin_ptr))
+ {
+ /* Field...::unpack() cannot return 0 */
+ ptr= f->unpack(f->ptr + offset, ptr);
+ }
+ else
+ bitmap_clear_bit(write_set, (field_ptr - begin_ptr) + 1);
+ }
+ return ptr;
+}
+
+int Rows_log_event::exec_event(st_relay_log_info *rli)
+{
+ DBUG_ENTER("Rows_log_event::exec_event(st_relay_log_info*)");
+ DBUG_ASSERT(m_table_id != ULONG_MAX);
+ int error= 0;
+ char const *row_start= m_rows_buf;
+ TABLE* table= rli->m_table_map.get_table(m_table_id);
+
+ /*
+ 'thd' has been set by exec_relay_log_event(), just before calling
+ exec_event(). We still check here to prevent future coding errors.
+ */
+ DBUG_ASSERT(rli->sql_thd == thd);
+
+ /*
+ lock_tables() reads the contents of thd->lex, so they must be
+ initialized, so we should call lex_start(); to be even safer, we call
+ mysql_init_query() which does a more complete set of inits.
+ */
+ mysql_init_query(thd, NULL, 0);
+
+ if (table)
+ {
+ /*
+ table == NULL means that this table should not be
+ replicated (this was set up by Table_map_log_event::exec_event() which
+ tested replicate-* rules).
+ */
+ TABLE_LIST table_list;
+ bool need_reopen;
+ uint count= 1;
+ bzero(&table_list, sizeof(table_list));
+ table_list.lock_type= TL_WRITE;
+ table_list.next_global= table_list.next_local= 0;
+ table_list.table= table;
+
+ for ( ; ; )
+ {
+ table_list.db= const_cast<char*>(table->s->db.str);
+ table_list.alias= table_list.table_name=
+ const_cast<char*>(table->s->table_name.str);
+
+ if ((error= lock_tables(thd, &table_list, count, &need_reopen)) == 0)
+ break;
+ if (!need_reopen)
+ {
+ slave_print_msg(ERROR_LEVEL, rli, error,
+ "Error in %s event: error during table %s.%s lock",
+ get_type_str(), table->s->db, table->s->table_name);
+ DBUG_RETURN(error);
+ }
+ /*
+ we need to store a local copy of the table names since the table object
+ will become invalid after close_tables_for_reopen
+ */
+ char *db= my_strdup(table->s->db.str, MYF(MY_WME));
+ char *table_name= my_strdup(table->s->table_name.str, MYF(MY_WME));
+
+ if (db == 0 || table_name == 0)
+ {
+ /*
+ Since the lock_tables() failed, the table is not locked, so
+ we don't need to unlock them.
+ */
+ DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+ }
+
+ /*
+ We also needs to flush the pending RBR event, since it keeps a
+ pointer to an open table.
+
+ ALTERNATIVE SOLUTION: Extract a pointer to the pending RBR
+ event and reset the table pointer after the tables has been
+ reopened.
+ */
+ thd->binlog_flush_pending_rows_event(false);
+
+ close_tables_for_reopen(thd, &table_list);
+
+ /* open the table again, same as in Table_map_event::exec_event */
+ table_list.db= const_cast<char*>(db);
+ table_list.alias= table_list.table_name= const_cast<char*>(table_name);
+ table_list.updating= 1;
+ TABLE_LIST *tables= &table_list;
+ if ((error= open_tables(thd, &tables, &count, 0)) == 0)
+ {
+ /* reset some variables for the table list*/
+ table_list.updating= 0;
+ /* retrieve the new table reference and update the table map */
+ table= table_list.table;
+ error= rli->m_table_map.set_table(m_table_id, table);
+ }
+ else /* error in open_tables */
+ {
+ if (thd->query_error || thd->is_fatal_error)
+ {
+ /*
+ Error reporting borrowed from Query_log_event with many excessive
+ simplifications (we don't honour --slave-skip-errors)
+ */
+ uint actual_error= thd->net.last_errno;
+ slave_print_msg(ERROR_LEVEL, rli, actual_error,
+ "Error '%s' on reopening table `%s`.`%s`",
+ (actual_error ? thd->net.last_error :
+ "unexpected success or fatal error"),
+ db, table_name);
+ thd->query_error= 1;
+ }
+ }
+ my_free((char*) db, MYF(MY_ALLOW_ZERO_PTR));
+ my_free((char*) table_name, MYF(MY_ALLOW_ZERO_PTR));
+
+ if (error)
+ DBUG_RETURN(error);
+ }
+
+ /*
+ It's not needed to set_time() but
+ 1) it continues the property that "Time" in SHOW PROCESSLIST shows how
+ much slave is behind
+ 2) it will be needed when we allow replication from a table with no
+ TIMESTAMP column to a table with one.
+ So we call set_time(), like in SBR. Presently it changes nothing.
+ */
+ thd->set_time((time_t)when);
+ /*
+ There are a few flags that are replicated with each row event.
+ Make sure to set/clear them before executing the main body of
+ the event.
+ */
+ if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
+ thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS;
+ else
+ thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
+
+ if (get_flags(RELAXED_UNIQUE_CHECKS_F))
+ thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
+ else
+ thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
+ /* A small test to verify that objects have consistent types */
+ DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
+
+ error= do_before_row_operations(table);
+ while (error == 0 && row_start < m_rows_end) {
+ char const *row_end= do_prepare_row(thd, table, row_start);
+ DBUG_ASSERT(row_end != NULL); // cannot happen
+ DBUG_ASSERT(row_end <= m_rows_end);
+
+ /* in_use can have been set to NULL in close_tables_for_reopen */
+ THD* old_thd= table->in_use;
+ if (!table->in_use)
+ table->in_use= thd;
+ error= do_exec_row(table);
+ table->in_use = old_thd;
+ switch (error)
+ {
+ /* Some recoverable errors */
+ case HA_ERR_RECORD_CHANGED:
+ case HA_ERR_KEY_NOT_FOUND: /* Idempotency support: OK if
+ tuple does not exist */
+ error= 0;
+ case 0:
+ break;
+
+ default:
+ slave_print_msg(ERROR_LEVEL, rli, error,
+ "Error in %s event: row application failed",
+ get_type_str());
+ thd->query_error= 1;
+ break;
+ }
+
+ row_start= row_end;
+ }
+ DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event",
+ rli->abort_slave=1;);
+ error= do_after_row_operations(table, error);
+ if (!cache_stmt)
+ thd->options|= OPTION_STATUS_NO_TRANS_UPDATE;
+
+ }
+
+ if (error)
+ { /* error has occured during the transaction */
+ /*
+ If one day we honour --skip-slave-errors in row-based replication, and
+ the error should be skipped, then we would clear mappings, rollback,
+ close tables, but the slave SQL thread would not stop and then may
+ assume the mapping is still available, the tables are still open...
+ So then we should clear mappings/rollback/close here only if this is a
+ STMT_END_F.
+ For now we code, knowing that error is not skippable and so slave SQL
+ thread is certainly going to stop.
+ */
+ rli->cleanup_context(thd, 1);
+ thd->query_error= 1;
+ DBUG_RETURN(error);
+ }
+
+ if (get_flags(STMT_END_F))
+ {
+ /*
+ This is the end of a statement or transaction, so close (and
+ unlock) the tables we opened when processing the
+ Table_map_log_event starting the statement.
+
+ OBSERVER. This will clear *all* mappings, not only those that
+ are open for the table. There is not good handle for on-close
+ actions for tables.
+
+ NOTE. Even if we have no table ('table' == 0) we still need to be
+ here, so that we increase the group relay log position. If we didn't, we
+ could have a group relay log position which lags behind "forever"
+ (assume the last master's transaction is ignored by the slave because of
+ replicate-ignore rules).
+ */
+ thd->binlog_flush_pending_rows_event(true);
+ /*
+ If this event is not in a transaction, the call below will, if some
+ transactional storage engines are involved, commit the statement into
+ them and flush the pending event to binlog.
+ If this event is in a transaction, the call will do nothing, but a
+ Xid_log_event will come next which will, if some transactional engines
+ are involved, commit the transaction and flush the pending event to the
+ binlog.
+ */
+ error= ha_autocommit_or_rollback(thd, 0);
+ /*
+ Now what if this is not a transactional engine? we still need to
+ flush the pending event to the binlog; we did it with
+ thd->binlog_flush_pending_rows_event(). Note that we imitate
+ what is done for real queries: a call to
+ ha_autocommit_or_rollback() (sometimes only if involves a
+ transactional engine), and a call to be sure to have the pending
+ event flushed.
+ */
+
+ rli->cleanup_context(thd, 0);
+ rli->transaction_end(thd);
+
+ if (error == 0)
+ {
+ /*
+ Clear any errors pushed in thd->net.last_err* if for example "no key
+ found" (as this is allowed). This is a safety measure; apparently
+ those errors (e.g. when executing a Delete_rows_log_event of a
+ non-existing row, like in rpl_row_mystery22.test,
+ thd->net.last_error = "Can't find record in 't1'" and last_errno=1032)
+ do not become visible. We still prefer to wipe them out.
+ */
+ thd->clear_error();
+ error= Log_event::exec_event(rli);
+ }
+ else
+ slave_print_msg(ERROR_LEVEL, rli, error,
+ "Error in %s event: commit of row events failed, "
+ "table `%s`.`%s`",
+ get_type_str(), table->s->db, table->s->table_name);
+ DBUG_RETURN(error);
+ }
+
+ if (table)
+ {
+ /*
+ As "table" is not NULL, we did a successful lock_tables(), without any
+ prior LOCK TABLES and are not in prelocked mode, so this assertion should
+ be true.
+ */
+ DBUG_ASSERT(thd->lock);
+ /*
+ If we are here, there are more events to come which may use our mappings
+ and our table. So don't clear mappings or close tables, just unlock
+ tables.
+ Why don't we lock the table once for all in
+ Table_map_log_event::exec_event() ? Because we could have in binlog:
+ BEGIN;
+ Table_map t1 -> 1
+ Write_rows to id 1
+ Table_map t2 -> 2
+ Write_rows to id 2
+ Xid_log_event
+ So we cannot lock t1 when executing the first Table_map, because at that
+ moment we don't know we'll also have to lock t2, and all tables must be
+ locked at once in MySQL.
+ */
+ mysql_unlock_tables(thd, thd->lock);
+ thd->lock= 0;
+ if ((table->s->primary_key == MAX_KEY) &&
+ !cache_stmt)
+ {
+ /*
+ ------------ Temporary fix until WL#2975 is implemented ---------
+ This event is not the last one (no STMT_END_F). If we stop now (in
+ case of terminate_slave_thread()), how will we restart? We have to
+ restart from Table_map_log_event, but as this table is not
+ transactional, the rows already inserted will still be present, and
+ idempotency is not guaranteed (no PK) so we risk that repeating leads
+ to double insert. So we desperately try to continue, hope we'll
+ eventually leave this buggy situation (by executing the final
+ Rows_log_event). If we are in a hopeless wait (reached end of last
+ relay log and nothing gets appended there), we timeout after one
+ minute, and notify DBA about the problem.
+ When WL#2975 is implemented, just remove the member
+ st_relay_log_info::unsafe_to_stop_at and all its occurences.
+ */
+ rli->unsafe_to_stop_at= time(0);
+ }
+ }
+
+ DBUG_ASSERT(error == 0);
+ thd->clear_error();
+ rli->inc_event_relay_log_pos();
+
+ DBUG_RETURN(0);
+}
+#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
+
+#ifndef MYSQL_CLIENT
+bool Rows_log_event::write_data_header(IO_CACHE *file)
+{
+ DBUG_ASSERT(m_table_id != ULONG_MAX);
+ byte buf[ROWS_HEADER_LEN]; // No need to init the buffer
+ DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
+ {
+ int4store(buf + 0, m_table_id);
+ int2store(buf + 4, m_flags);
+ return (my_b_safe_write(file, buf, 6));
+ });
+ int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
+ int2store(buf + RW_FLAGS_OFFSET, m_flags);
+ return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
+}
+
+bool Rows_log_event::write_data_body(IO_CACHE*file)
+{
+ /*
+ Note that this should be the number of *bits*, not the number of
+ bytes.
+ */
+ byte sbuf[my_vle_sizeof(m_width)];
+ my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
+
+ char *const sbuf_end= my_vle_encode(sbuf, sizeof(sbuf), m_width);
+ DBUG_ASSERT(static_cast<my_size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
+
+ return (my_b_safe_write(file, sbuf, sbuf_end - sbuf) ||
+ my_b_safe_write(file, reinterpret_cast<byte*>(m_cols.bitmap),
+ no_bytes_in_map(&m_cols)) ||
+ my_b_safe_write(file, m_rows_buf, data_size));
+}
+#endif
+
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) && defined(DBUG_RBR)
+void Rows_log_event::pack_info(Protocol *protocol)
+{
+ char buf[256];
+ char const *const flagstr= get_flags(STMT_END_F) ? "STMT_END_F" : "";
+ char const *const dbnam= m_table->s->db;
+ char const *const tblnam= m_table->s->table_name;
+ my_size_t bytes= snprintf(buf, sizeof(buf),
+ "%s.%s - %s", dbnam, tblnam, flagstr);
+ protocol->store(buf, bytes, &my_charset_bin);
+}
+#endif
+
+/**************************************************************************
+ Table_map_log_event member functions
+**************************************************************************/
+
+/*
+ Constructor used to build an event for writing to the binary log.
+ Mats says tbl->s lives longer than this event so it's ok to copy pointers
+ (tbl->s->db etc) and not pointer content.
+ */
+#if !defined(MYSQL_CLIENT)
+Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
+ bool is_transactional, uint16 flags)
+ : Log_event(thd, 0, is_transactional),
+ m_table(tbl),
+ m_dbnam(tbl->s->db.str),
+ m_dblen(m_dbnam ? tbl->s->db.length : 0),
+ m_tblnam(tbl->s->table_name.str),
+ m_tbllen(tbl->s->table_name.length),
+ m_colcnt(tbl->s->fields), m_coltype(0),
+ m_table_id(tid),
+ m_flags(flags)
+{
+ DBUG_ASSERT(m_table_id != ULONG_MAX);
+ /*
+ In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
+ table.cc / alloc_table_share():
+ Use the fact the key is db/0/table_name/0
+ As we rely on this let's assert it.
+ */
+ DBUG_ASSERT((tbl->s->db.str == 0) ||
+ (tbl->s->db.str[tbl->s->db.length] == 0));
+ DBUG_ASSERT(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
+
+
+ m_data_size= TABLE_MAP_HEADER_LEN;
+ DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", m_data_size= 6;)
+ m_data_size+= m_dblen + 2; // Include length and terminating \0
+ m_data_size+= m_tbllen + 2; // Include length and terminating \0
+ m_data_size+= 1 + m_colcnt; // COLCNT and column types
+
+ /* If malloc fails, catched in is_valid() */
+ if ((m_memory= my_malloc(m_colcnt, MYF(MY_WME))))
+ {
+ m_coltype= reinterpret_cast<unsigned char*>(m_memory);
+ for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
+ m_coltype[i]= m_table->field[i]->type();
+ }
+}
+#endif /* !defined(MYSQL_CLIENT) */
+
+/*
+ Constructor used by slave to read the event from the binary log.
+ */
+#if defined(HAVE_REPLICATION)
+Table_map_log_event::Table_map_log_event(const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+
+ : Log_event(buf, description_event),
+#ifndef MYSQL_CLIENT
+ m_table(NULL),
+#endif
+ m_memory(NULL)
+{
+ DBUG_ENTER("Table_map_log_event::Table_map_log_event(const char*,uint,...)");
+
+ uint8 common_header_len= description_event->common_header_len;
+ uint8 post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
+ DBUG_PRINT("info",("event_len=%ld, common_header_len=%d, post_header_len=%d",
+ event_len, common_header_len, post_header_len));
+
+ DBUG_DUMP("event buffer", buf, event_len);
+
+ /* Read the post-header */
+ const char *post_start= buf + common_header_len;
+
+ post_start+= TM_MAPID_OFFSET;
+ if (post_header_len == 6)
+ {
+ /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
+ m_table_id= uint4korr(post_start);
+ post_start+= 4;
+ }
+ else
+ {
+ DBUG_ASSERT(post_header_len == TABLE_MAP_HEADER_LEN);
+ m_table_id= uint6korr(post_start);
+ post_start+= TM_FLAGS_OFFSET;
+ }
+
+ DBUG_ASSERT(m_table_id != ULONG_MAX);
+
+ m_flags= uint2korr(post_start);
+
+ /* Read the variable part of the event */
+ const char *const vpart= buf + common_header_len + post_header_len;
+
+ /* Extract the length of the various parts from the buffer */
+ byte const* const ptr_dblen= vpart + 0;
+ m_dblen= *(unsigned char*) ptr_dblen;
+
+ /* Length of database name + counter + terminating null */
+ byte const* const ptr_tbllen= ptr_dblen + m_dblen + 2;
+ m_tbllen= *(unsigned char*) ptr_tbllen;
+
+ /* Length of table name + counter + terminating null */
+ byte const* const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
+ byte const* const ptr_after_colcnt= my_vle_decode(&m_colcnt, ptr_colcnt);
+
+ DBUG_PRINT("info",("m_dblen=%d off=%d m_tbllen=%d off=%d m_colcnt=%d off=%d",
+ m_dblen, ptr_dblen-vpart, m_tbllen, ptr_tbllen-vpart,
+ m_colcnt, ptr_colcnt-vpart));
+
+ /* Allocate mem for all fields in one go. If fails, catched in is_valid() */
+ m_memory= my_multi_malloc(MYF(MY_WME),
+ &m_dbnam, m_dblen + 1,
+ &m_tblnam, m_tbllen + 1,
+ &m_coltype, m_colcnt,
+ NULL);
+
+ if (m_memory)
+ {
+ /* Copy the different parts into their memory */
+ strncpy(const_cast<char*>(m_dbnam), ptr_dblen + 1, m_dblen + 1);
+ strncpy(const_cast<char*>(m_tblnam), ptr_tbllen + 1, m_tbllen + 1);
+ memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
+ }
+
+ DBUG_VOID_RETURN;
+}
+#endif
+
+Table_map_log_event::~Table_map_log_event()
+{
+ my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR));
+}
+
+/*
+ Find a table based on database name and table name.
+
+ DESCRIPTION
+
+ Currently, only the first table of the 'table_list' is located. If the
+ table is found in the list of open tables for the thread, the 'table'
+ field of 'table_list' is filled in.
+
+ PARAMETERS
+
+ thd Thread structure
+ table_list List of tables to locate in the thd->open_tables list.
+ count Pointer to a variable that will be set to the number of
+ tables found. If the pointer is NULL, nothing will be stored.
+
+ RETURN VALUE
+
+ The number of tables found.
+
+ TO DO
+
+ Replace the list of table searches with a hash based on the combined
+ database and table name. The handler_tables_hash is inappropriate since
+ it hashes on the table alias. At the same time, the function can be
+ extended to handle a full list of table names, in the same spirit as
+ open_tables() and lock_tables().
+*/
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+static uint find_tables(THD *thd, TABLE_LIST *table_list, uint *count)
+{
+ uint result= 0;
+
+ /* we verify that the caller knows our limitation */
+ DBUG_ASSERT(table_list->next_global == 0);
+ for (TABLE *table= thd->open_tables; table ; table= table->next)
+ {
+ if (strcmp(table->s->db.str, table_list->db) == 0
+ && strcmp(table->s->table_name.str, table_list->table_name) == 0)
+ {
+ /* Copy the table pointer into the table list. */
+ table_list->table= table;
+ result= 1;
+ break;
+ }
+ }
+
+ if (count)
+ *count= result;
+ return result;
+}
+#endif
+
+/*
+ Return value is an error code, one of:
+
+ -1 Failure to open table [from open_tables()]
+ 0 Success
+ 1 No room for more tables [from set_table()]
+ 2 Out of memory [from set_table()]
+ 3 Wrong table definition
+ 4 Daisy-chaining RBR with SBR not possible
+ */
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+int Table_map_log_event::exec_event(st_relay_log_info *rli)
+{
+ DBUG_ENTER("Table_map_log_event::exec_event(st_relay_log_info*)");
+
+ DBUG_ASSERT(rli->sql_thd == thd);
+
+ /* Step the query id to mark what columns that are actually used. */
+ pthread_mutex_lock(&LOCK_thread_count);
+ thd->query_id= next_query_id();
+ pthread_mutex_unlock(&LOCK_thread_count);
+
+ TABLE_LIST table_list;
+ uint32 dummy_len;
+ bzero(&table_list, sizeof(table_list));
+ table_list.db= const_cast<char *>
+ (rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
+ table_list.alias= table_list.table_name= const_cast<char*>(m_tblnam);
+ table_list.lock_type= TL_WRITE;
+ table_list.next_global= table_list.next_local= 0;
+ table_list.updating= 1;
+
+ int error= 0;
+
+ if (rpl_filter->db_ok(table_list.db) &&
+ (!rpl_filter->is_on() || rpl_filter->tables_ok("", &table_list)))
+ {
+ /*
+ Check if the slave is set to use SBR. If so, the slave should
+ stop immediately since it is not possible to daisy-chain from
+ RBR to SBR. Once RBR is used, the rest of the chain has to use
+ RBR.
+ */
+ if (mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG) &&
+ !binlog_row_based)
+ {
+ slave_print_msg(ERROR_LEVEL, rli, ER_BINLOG_ROW_RBR_TO_SBR,
+ "It is not possible to use statement-based binlogging "
+ "on a slave that replicates row-based. Please use "
+ "--binrow-format=row on slave if you want to use "
+ "--log-slave-updates and read row-based binlog events.");
+ DBUG_RETURN(ERR_RBR_TO_SBR);
+ }
+
+ /*
+ Open the table if it is not already open and add the table to table map.
+ If the table should not be replicated, we don't bother to do anything.
+ The table map will return NULL and the row-level event will effectively
+ be a no-op.
+ */
+ uint count;
+ if (find_tables(thd, &table_list, &count) == 0)
+ {
+ /*
+ open_tables() reads the contents of thd->lex, so they must be
+ initialized, so we should call lex_start(); to be even safer, we call
+ mysql_init_query() which does a more complete set of inits.
+ */
+ mysql_init_query(thd, NULL, 0);
+ TABLE_LIST *tables= &table_list;
+ if ((error= open_tables(thd, &tables, &count, 0)))
+ {
+ if (thd->query_error || thd->is_fatal_error)
+ {
+ /*
+ Error reporting borrowed from Query_log_event with many excessive
+ simplifications (we don't honour --slave-skip-errors)
+ */
+ uint actual_error= thd->net.last_errno;
+ slave_print_msg(ERROR_LEVEL, rli, actual_error,
+ "Error '%s' on opening table `%s`.`%s`",
+ (actual_error ? thd->net.last_error :
+ "unexpected success or fatal error"),
+ table_list.db, table_list.table_name);
+ thd->query_error= 1;
+ }
+ DBUG_RETURN(error);
+ }
+ }
+
+ m_table= table_list.table;
+
+ /*
+ This will fail later otherwise, the 'in_use' field should be
+ set to the current thread.
+ */
+ DBUG_ASSERT(m_table->in_use);
+
+ /*
+ Check that the number of columns and the field types in the
+ event match the number of columns and field types in the opened
+ table.
+ */
+ uint col= m_table->s->fields;
+
+ if (col == m_colcnt)
+ {
+ while (col-- > 0)
+ if (m_table->field[col]->type() != m_coltype[col])
+ break;
+ }
+
+ TABLE_SHARE const *const tsh= m_table->s;
+
+ /*
+ Check the following termination conditions:
+
+ (col == m_table->s->fields)
+ ==> (m_table->s->fields != m_colcnt)
+ (0 <= col < m_table->s->fields)
+ ==> (m_table->field[col]->type() != m_coltype[col])
+
+ Logically, A ==> B is equivalent to !A || B
+
+ Since col is unsigned, is suffices to check that col <=
+ tsh->fields. If col wrapped (by decreasing col when it is 0),
+ the number will be UINT_MAX, which is greater than tsh->fields.
+ */
+ DBUG_ASSERT(!(col == tsh->fields) || tsh->fields != m_colcnt);
+ DBUG_ASSERT(!(col < tsh->fields) ||
+ (m_table->field[col]->type() != m_coltype[col]));
+
+ if (col <= tsh->fields)
+ {
+ /*
+ If we get here, the number of columns in the event didn't
+ match the number of columns in the table on the slave, *or*
+ there were a column in the table on the slave that did not
+ have the same type as given in the event.
+
+ If 'col' has the value that was assigned to it, it was a
+ mismatch between the number of columns on the master and the
+ slave.
+ */
+ if (col == tsh->fields)
+ {
+ DBUG_ASSERT(tsh->db.str && tsh->table_name.str);
+ slave_print_msg(ERROR_LEVEL, rli, ER_BINLOG_ROW_WRONG_TABLE_DEF,
+ "Table width mismatch - "
+ "received %u columns, %s.%s has %u columns",
+ m_colcnt, tsh->db.str, tsh->table_name.str, tsh->fields);
+ }
+ else
+ {
+ DBUG_ASSERT(col < m_colcnt && col < tsh->fields);
+ DBUG_ASSERT(tsh->db.str && tsh->table_name.str);
+ slave_print_msg(ERROR_LEVEL, rli, ER_BINLOG_ROW_WRONG_TABLE_DEF,
+ "Column %d type mismatch - "
+ "received type %d, %s.%s has type %d",
+ col, m_coltype[col], tsh->db.str, tsh->table_name.str,
+ m_table->field[col]->type());
+ }
+
+ thd->query_error= 1;
+ DBUG_RETURN(ERR_BAD_TABLE_DEF);
+ }
+
+ /*
+ We record in the slave's information that the number m_table_id is
+ mapped to the m_table object
+ */
+ if (!error)
+ error= rli->m_table_map.set_table(m_table_id, m_table);
+
+ /*
+ Tell the RLI that we are touching a table.
+
+ TODO: Maybe we can combine this with the previous operation?
+ */
+ if (!error)
+ rli->touching_table(m_dbnam, m_tblnam, m_table_id);
+ }
+
+ /*
+ We explicitly do not call Log_event::exec_event() here since we do not
+ want the relay log position to be flushed to disk. The flushing will be
+ done by the last Rows_log_event that either ends a statement (outside a
+ transaction) or a transaction.
+
+ A table map event can *never* end a transaction or a statement, so we
+ just step the relay log position.
+ */
+
+ if (likely(!error))
+ rli->inc_event_relay_log_pos();
+
+ DBUG_RETURN(error);
+}
+#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
+
+#ifndef MYSQL_CLIENT
+bool Table_map_log_event::write_data_header(IO_CACHE *file)
+{
+ DBUG_ASSERT(m_table_id != ULONG_MAX);
+ byte buf[TABLE_MAP_HEADER_LEN];
+ DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
+ {
+ int4store(buf + 0, m_table_id);
+ int2store(buf + 4, m_flags);
+ return (my_b_safe_write(file, buf, 6));
+ });
+ int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id);
+ int2store(buf + TM_FLAGS_OFFSET, m_flags);
+ return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
+}
+
+bool Table_map_log_event::write_data_body(IO_CACHE *file)
+{
+ DBUG_ASSERT(m_dbnam != NULL);
+ DBUG_ASSERT(m_tblnam != NULL);
+ /* We use only one byte per length for storage in event: */
+ DBUG_ASSERT(m_dblen < 128);
+ DBUG_ASSERT(m_tbllen < 128);
+
+ byte const dbuf[]= { m_dblen };
+ byte const tbuf[]= { m_tbllen };
+
+ byte cbuf[my_vle_sizeof(m_colcnt)];
+ byte *const cbuf_end= my_vle_encode(cbuf, sizeof(cbuf), m_colcnt);
+ DBUG_ASSERT(static_cast<my_size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
+
+ return (my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
+ my_b_safe_write(file, m_dbnam, m_dblen+1) ||
+ my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
+ my_b_safe_write(file, m_tblnam, m_tbllen+1) ||
+ my_b_safe_write(file, cbuf, cbuf_end - cbuf) ||
+ my_b_safe_write(file, reinterpret_cast<char*>(m_coltype), m_colcnt));
+ }
+#endif
+
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+
+/*
+ Print some useful information for the SHOW BINARY LOG information
+ field.
+ */
+
+void Table_map_log_event::pack_info(Protocol *protocol)
+{
+ char buf[256];
+ my_size_t bytes= snprintf(buf, sizeof(buf), "%s.%s", m_dbnam, m_tblnam);
+ protocol->store(buf, bytes, &my_charset_bin);
+}
+
+#endif
+
+
+#ifdef MYSQL_CLIENT
+void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
+{
+ if (!print_event_info->short_form)
+ {
+ print_header(file, print_event_info);
+ fprintf(file, "\tTable_map: `%s`.`%s` mapped to number %lu\n",
+ m_dbnam, m_tblnam, m_table_id);
+ print_base64(file, print_event_info);
+ }
+}
+#endif
+
+/**************************************************************************
+ Write_rows_log_event member functions
+**************************************************************************/
+
+/*
+ Constructor used to build an event for writing to the binary log.
+ */
+#if !defined(MYSQL_CLIENT)
+Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
+ ulong tid_arg,
+ MY_BITMAP const *cols,
+ bool is_transactional)
+ : Rows_log_event(thd_arg, tbl_arg, tid_arg, cols, is_transactional)
+{
+}
+#endif
+
+/*
+ Constructor used by slave to read the event from the binary log.
+ */
+#ifdef HAVE_REPLICATION
+Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event)
+{
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+int Write_rows_log_event::do_before_row_operations(TABLE *table)
+{
+ int error= 0;
+
+ /*
+ We are using REPLACE semantics and not INSERT IGNORE semantics
+ when writing rows, that is: new rows replace old rows. We need to
+ inform the storage engine that it should use this behaviour.
+ */
+
+ /* Tell the storage engine that we are using REPLACE semantics. */
+ thd->lex->duplicates= DUP_REPLACE;
+
+ /*
+ Pretend we're executing a REPLACE command: this is needed for
+ InnoDB and NDB Cluster since they are not (properly) checking the
+ lex->duplicates flag.
+ */
+ thd->lex->sql_command= SQLCOM_REPLACE;
+
+ table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); // needed for ndbcluster
+ /*
+ TODO: the cluster team (Tomas?) says that it's better if the engine knows
+ how many rows are going to be inserted, then it can allocate needed memory
+ from the start.
+ */
+ table->file->start_bulk_insert(0);
+ /*
+ We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
+ any TIMESTAMP column with data from the row but instead will use
+ the event's current time.
+ As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
+ columns, we know that all TIMESTAMP columns on slave will receive explicit
+ data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
+ When we allow a table without TIMESTAMP to be replicated to a table having
+ more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
+ column to be replicated into a BIGINT column and the slave's table has a
+ TIMESTAMP column, then the slave's TIMESTAMP column will take its value
+ from set_time() which we called earlier (consistent with SBR). And then in
+ some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
+ analyze if explicit data is provided for slave's TIMESTAMP columns).
+ */
+ table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
+ return error;
+}
+
+int Write_rows_log_event::do_after_row_operations(TABLE *table, int error)
+{
+ if (error == 0)
+ error= table->file->end_bulk_insert();
+ return error;
+}
+
+char const *Write_rows_log_event::do_prepare_row(THD *thd, TABLE *table,
+ char const *row_start)
+{
+ char const *ptr= row_start;
+ DBUG_ASSERT(table != NULL);
+ /*
+ This assertion actually checks that there is at least as many
+ columns on the slave as on the master.
+ */
+ DBUG_ASSERT(table->s->fields >= m_width);
+ DBUG_ASSERT(ptr);
+ ptr= unpack_row(table, table->record[0], ptr, &m_cols);
+ return ptr;
+}
+
+/*
+ Check if there are more UNIQUE keys after the given key.
+*/
+static int
+last_uniq_key(TABLE *table, uint keyno)
+{
+ while (++keyno < table->s->keys)
+ if (table->key_info[keyno].flags & HA_NOSAME)
+ return 0;
+ return 1;
+}
+
+/* Anonymous namespace for template functions/classes */
+namespace {
+
+ /*
+ Smart pointer that will automatically call my_afree (a macro) when
+ the pointer goes out of scope. This is used so that I do not have
+ to remember to call my_afree() before each return. There is no
+ overhead associated with this, since all functions are inline.
+
+ I (Matz) would prefer to use the free function as a template
+ parameter, but that is not possible when the "function" is a
+ macro.
+ */
+ template <class Obj>
+ class auto_afree_ptr
+ {
+ Obj* m_ptr;
+ public:
+ auto_afree_ptr(Obj* ptr) : m_ptr(ptr) { }
+ ~auto_afree_ptr() { if (m_ptr) my_afree(m_ptr); }
+ void assign(Obj* ptr) {
+ /* Only to be called if it hasn't been given a value before. */
+ DBUG_ASSERT(m_ptr == NULL);
+ m_ptr= ptr;
+ }
+ Obj* get() { return m_ptr; }
+ };
+
+}
+
+
+/*
+ Replace the provided record in the database.
+
+ Similar to how it is done in <code>mysql_insert()</code>, we first
+ try to do a <code>ha_write_row()</code> and of that fails due to
+ duplicated keys (or indices), we do an <code>ha_update_row()</code>
+ or a <code>ha_delete_row()</code> instead.
+
+ @param thd Thread context for writing the record.
+ @param table Table to which record should be written.
+
+ @return Error code on failure, 0 on success.
+ */
+static int
+replace_record(THD *thd, TABLE *table)
+{
+ DBUG_ASSERT(table != NULL && thd != NULL);
+
+ int error;
+ int keynum;
+ auto_afree_ptr<char> key(NULL);
+
+ while ((error= table->file->ha_write_row(table->record[0])))
+ {
+ if ((keynum= table->file->get_dup_key(error)) < 0)
+ {
+ /* We failed to retrieve the duplicate key */
+ return HA_ERR_FOUND_DUPP_KEY;
+ }
+
+ /*
+ We need to retrieve the old row into record[1] to be able to
+ either update or delete the offending record. We either:
+
+ - use rnd_pos() with a row-id (available as dupp_row) to the
+ offending row, if that is possible (MyISAM and Blackhole), or else
+
+ - use index_read_idx() with the key that is duplicated, to
+ retrieve the offending row.
+ */
+ if (table->file->table_flags() & HA_DUPP_POS)
+ {
+ error= table->file->rnd_pos(table->record[1], table->file->dupp_ref);
+ if (error)
+ return error;
+ }
+ else
+ {
+ if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
+ {
+ return my_errno;
+ }
+
+ if (key.get() == NULL)
+ {
+ key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
+ if (key.get() == NULL)
+ return ENOMEM;
+ }
+
+ key_copy(key.get(), table->record[0], table->key_info + keynum, 0);
+ error= table->file->index_read_idx(table->record[1], keynum, key.get(),
+ table->key_info[keynum].key_length,
+ HA_READ_KEY_EXACT);
+ if (error)
+ return error;
+ }
+
+ /*
+ Now, table->record[1] should contain the offending row. That
+ will enable us to update it or, alternatively, delete it (so
+ that we can insert the new row afterwards).
+
+ REPLACE is defined as either INSERT or DELETE + INSERT. If
+ possible, we can replace it with an UPDATE, but that will not
+ work on InnoDB if FOREIGN KEY checks are necessary.
+
+ I (Matz) am not sure of the reason for the last_uniq_key()
+ check as, but I'm guessing that it's something along the
+ following lines.
+
+ Suppose that we got the duplicate key to be a key that is not
+ the last unique key for the table and we perform an update:
+ then there might be another key for which the unique check will
+ fail, so we're better off just deleting the row and inserting
+ the correct row.
+ */
+ if (last_uniq_key(table, keynum) &&
+ !table->file->referenced_by_foreign_key())
+ {
+ error=table->file->ha_update_row(table->record[1],
+ table->record[0]);
+ return error;
+ }
+ else
+ {
+ if ((error= table->file->ha_delete_row(table->record[1])))
+ return error;
+ /* Will retry ha_write_row() with the offending row removed. */
+ }
+ }
+ return error;
+}
+
+int Write_rows_log_event::do_exec_row(TABLE *table)
+{
+ DBUG_ASSERT(table != NULL);
+ int error= replace_record(thd, table);
+ return error;
+}
+#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
+
+#ifdef MYSQL_CLIENT
+void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
+{
+ if (!print_event_info->short_form)
+ {
+ print_header(file, print_event_info);
+ fprintf(file, "\tWrite_rows: table id %lu", m_table_id);
+ print_base64(file, print_event_info);
+ }
+}
+#endif
+
+/**************************************************************************
+ Delete_rows_log_event member functions
+**************************************************************************/
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+static int record_compare(TABLE *table, byte const *a, byte const *b)
+{
+ for (my_size_t i= 0 ; i < table->s->fields ; ++i)
+ {
+ uint const off= table->field[i]->offset();
+ uint const res= table->field[i]->cmp_binary(a + off, b + off);
+ if (res != 0) {
+ return res;
+ }
+ }
+ return 0;
+}
+
+
+/*
+ Find the row given by 'key', if the table has keys, or else use a table scan
+ to find (and fetch) the row. If the engine allows random access of the
+ records, a combination of position() and rnd_pos() will be used.
+
+ The 'record_buf' will be used as buffer for records while locating the
+ correct row.
+ */
+static int find_and_fetch_row(TABLE *table, byte *key, byte *record_buf)
+{
+ DBUG_ENTER("find_and_fetch_row(TABLE *table, byte *key, byte *record)");
+ DBUG_PRINT("enter", ("table=%p, key=%p, record=%p",
+ table, key, record_buf));
+
+ DBUG_ASSERT(table->in_use != NULL);
+
+ if ((table->file->table_flags() & HA_PRIMARY_KEY_ALLOW_RANDOM_ACCESS)
+ && table->s->primary_key < MAX_KEY)
+ {
+ /*
+ Use a more efficient method to fetch the record given by
+ table->record[0] if the engine allows it. We first compute a
+ row reference using the position() member function (it will be
+ stored in table->file->ref) and the use rnd_pos() to position
+ the "cursor" at the correct row.
+ */
+ table->file->position(table->record[0]);
+ DBUG_RETURN(table->file->rnd_pos(table->record[0], table->file->ref));
+ }
+
+ DBUG_ASSERT(record_buf);
+
+ if (table->s->keys > 0)
+ {
+ int error;
+ if ((error= table->file->index_read_idx(record_buf, 0, key,
+ table->key_info->key_length,
+ HA_READ_KEY_EXACT)))
+ {
+ table->file->print_error(error, MYF(0));
+ DBUG_RETURN(error);
+ }
+
+ /*
+ Below is a minor "optimization". If the key (i.e., key number
+ 0) has the HA_NOSAME flag set, we know that we have found the
+ correct record (since there can be no duplicates); otherwise, we
+ have to compare the record with the one found to see if it is
+ the correct one.
+
+ CAVEAT! This behaviour is essential for the replication of,
+ e.g., the mysql.proc table since the correct record *shall* be
+ found using the primary key *only*. There shall be no
+ comparison of non-PK columns to decide if the correct record is
+ found. I can see no scenario where it would be incorrect to
+ chose the row to change only using a PK or an UNNI.
+ */
+ if (table->key_info->flags & HA_NOSAME)
+ DBUG_RETURN(0);
+
+ while (record_compare(table, table->record[0], record_buf) != 0)
+ {
+ int error;
+ if ((error= table->file->index_next(record_buf)))
+ {
+ table->file->print_error(error, MYF(0));
+ DBUG_RETURN(error);
+ }
+ }
+ }
+ else
+ {
+ /* Continue until we find the right record or have made a full loop */
+ int restart_count= 0; // Number of times scanning has restarted from top
+ int error= 0;
+ do
+ {
+ error= table->file->rnd_next(record_buf);
+ switch (error)
+ {
+ case 0:
+ case HA_ERR_RECORD_DELETED:
+ break;
+
+ case HA_ERR_END_OF_FILE:
+ if (++restart_count < 2)
+ table->file->ha_rnd_init(1);
+ break;
+
+ default:
+ table->file->print_error(error, MYF(0));
+ DBUG_RETURN(error);
+ }
+ }
+ while (restart_count < 2 &&
+ record_compare(table, table->record[0], record_buf) != 0);
+
+ DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
+ DBUG_RETURN(error);
+ }
+
+ DBUG_RETURN(0);
+}
+#endif
+
+/*
+ Constructor used to build an event for writing to the binary log.
+ */
+
+#ifndef MYSQL_CLIENT
+Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
+ ulong tid, MY_BITMAP const *cols,
+ bool is_transactional)
+ : Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional)
+#ifdef HAVE_REPLICATION
+ ,m_memory(NULL), m_key(NULL), m_search_record(NULL)
+#endif
+{
+}
+#endif /* #if !defined(MYSQL_CLIENT) */
+
+/*
+ Constructor used by slave to read the event from the binary log.
+ */
+#ifdef HAVE_REPLICATION
+Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len,
+ const Format_description_log_event
+ *description_event)
+#if defined(MYSQL_CLIENT)
+ : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event)
+#else
+ : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event),
+ m_memory(NULL), m_key(NULL), m_search_record(NULL)
+#endif
+{
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+int Delete_rows_log_event::do_before_row_operations(TABLE *table)
+{
+ DBUG_ASSERT(m_memory == NULL);
+
+ if ((table->file->table_flags() & HA_PRIMARY_KEY_ALLOW_RANDOM_ACCESS) &&
+ table->s->primary_key < MAX_KEY)
+ {
+ /*
+ We don't need to allocate any memory for m_search_record and
+ m_key since they are not used.
+ */
+ return 0;
+ }
+
+ int error= 0;
+
+ if (table->s->keys > 0)
+ {
+ m_memory=
+ my_multi_malloc(MYF(MY_WME),
+ &m_search_record, table->s->reclength,
+ &m_key, table->key_info->key_length,
+ NULL);
+ }
+ else
+ {
+ m_memory= m_search_record= my_malloc(table->s->reclength, MYF(MY_WME));
+ m_key= NULL;
+ }
+ if (!m_memory)
+ return HA_ERR_OUT_OF_MEM;
+
+ if (table->s->keys > 0)
+ {
+ /* We have a key: search the table using the index */
+ if (!table->file->inited)
+ error= table->file->ha_index_init(0, FALSE);
+ }
+ else
+ {
+ /* We doesn't have a key: search the table using rnd_next() */
+ error= table->file->ha_rnd_init(1);
+ }
+
+ return error;
+}
+
+int Delete_rows_log_event::do_after_row_operations(TABLE *table, int error)
+{
+ /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
+ table->file->ha_index_or_rnd_end();
+ my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc
+ m_memory= m_search_record= m_key= NULL;
+
+ return error;
+}
+
+char const *Delete_rows_log_event::do_prepare_row(THD *thd, TABLE *table,
+ char const *row_start)
+{
+ char const *ptr= row_start;
+ DBUG_ASSERT(ptr);
+ /*
+ This assertion actually checks that there is at least as many
+ columns on the slave as on the master.
+ */
+ DBUG_ASSERT(table->s->fields >= m_width);
+
+ DBUG_ASSERT(ptr != NULL);
+ ptr= unpack_row(table, table->record[0], ptr, &m_cols);
+
+ /*
+ If we will access rows using the random access method, m_key will
+ be set to NULL, so we do not need to make a key copy in that case.
+ */
+ if (m_key)
+ {
+ KEY *const key_info= table->key_info;
+
+ key_copy(m_key, table->record[0], key_info, 0);
+ }
+
+ return ptr;
+}
+
+int Delete_rows_log_event::do_exec_row(TABLE *table)
+{
+ DBUG_ASSERT(table != NULL);
+
+ int error= find_and_fetch_row(table, m_key, m_search_record);
+ if (error)
+ return error;
+
+ /*
+ Now we should have the right row to delete. We are using
+ record[0] since it is guaranteed to point to a record with the
+ correct value.
+ */
+ error= table->file->ha_delete_row(table->record[0]);
+
+ return error;
+}
+
+#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
+
+#ifdef MYSQL_CLIENT
+void Delete_rows_log_event::print(FILE *file,
+ PRINT_EVENT_INFO* print_event_info)
+{
+ if (!print_event_info->short_form)
+ {
+ print_header(file, print_event_info);
+ fprintf(file, "\tDelete_rows: table id %lu", m_table_id);
+ print_base64(file, print_event_info);
+ }
+}
+#endif
+
+
+/**************************************************************************
+ Update_rows_log_event member functions
+**************************************************************************/
+
+/*
+ Constructor used to build an event for writing to the binary log.
+ */
+#if !defined(MYSQL_CLIENT)
+Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
+ ulong tid, MY_BITMAP const *cols,
+ bool is_transactional)
+: Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional)
+#ifdef HAVE_REPLICATION
+ , m_memory(NULL), m_key(NULL)
+#endif
+{
+}
+#endif /* !defined(MYSQL_CLIENT) */
+
+/*
+ Constructor used by slave to read the event from the binary log.
+ */
+#ifdef HAVE_REPLICATION
+Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len,
+ const
+ Format_description_log_event
+ *description_event)
+#if defined(MYSQL_CLIENT)
+ : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event)
+#else
+ : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event),
+ m_memory(NULL), m_key(NULL)
+#endif
+{
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+int Update_rows_log_event::do_before_row_operations(TABLE *table)
+{
+ DBUG_ASSERT(m_memory == NULL);
+
+ if ((table->file->table_flags() & HA_PRIMARY_KEY_ALLOW_RANDOM_ACCESS) &&
+ table->s->primary_key < MAX_KEY)
+ {
+ /*
+ We don't need to allocate any memory for m_search_record and
+ m_key since they are not used.
+ */
+ return 0;
+ }
+
+ int error= 0;
+
+ if (table->s->keys > 0)
+ {
+ m_memory=
+ my_multi_malloc(MYF(MY_WME),
+ &m_search_record, table->s->reclength,
+ &m_key, table->key_info->key_length,
+ NULL);
+ }
+ else
+ {
+ m_memory= m_search_record= my_malloc(table->s->reclength, MYF(MY_WME));
+ m_key= NULL;
+ }
+ if (!m_memory)
+ return HA_ERR_OUT_OF_MEM;
+
+ if (table->s->keys > 0)
+ {
+ /* We have a key: search the table using the index */
+ if (!table->file->inited)
+ error= table->file->ha_index_init(0, FALSE);
+ }
+ else
+ {
+ /* We doesn't have a key: search the table using rnd_next() */
+ error= table->file->ha_rnd_init(1);
+ }
+ table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
+
+ return error;
+}
+
+int Update_rows_log_event::do_after_row_operations(TABLE *table, int error)
+{
+ /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
+ table->file->ha_index_or_rnd_end();
+ my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR));
+ m_memory= m_search_record= m_key= NULL;
+
+ return error;
+}
+
+char const *Update_rows_log_event::do_prepare_row(THD *thd, TABLE *table,
+ char const *row_start)
+{
+ char const *ptr= row_start;
+ DBUG_ASSERT(ptr);
+ /*
+ This assertion actually checks that there is at least as many
+ columns on the slave as on the master.
+ */
+ DBUG_ASSERT(table->s->fields >= m_width);
+
+ /* record[0] is the before image for the update */
+ ptr= unpack_row(table, table->record[0], ptr, &m_cols);
+ DBUG_ASSERT(ptr != NULL);
+ /* record[1] is the after image for the update */
+ ptr= unpack_row(table, table->record[1], ptr, &m_cols);
+
+ /*
+ If we will access rows using the random access method, m_key will
+ be set to NULL, so we do not need to make a key copy in that case.
+ */
+ if (m_key)
+ {
+ KEY *const key_info= table->key_info;
+
+ key_copy(m_key, table->record[0], key_info, 0);
+ }
+
+ return ptr;
+}
+
+int Update_rows_log_event::do_exec_row(TABLE *table)
+{
+ DBUG_ASSERT(table != NULL);
+
+ int error= find_and_fetch_row(table, m_key, m_search_record);
+ if (error)
+ return error;
+
+ /*
+ Now we should have the right row to update. The record that has
+ been fetched is guaranteed to be in record[0], so we use that.
+ */
+ error= table->file->ha_update_row(table->record[0], table->record[1]);
+
+ return error;
+}
+#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
+
+#ifdef MYSQL_CLIENT
+void Update_rows_log_event::print(FILE *file,
+ PRINT_EVENT_INFO* print_event_info)
+{
+ if (!print_event_info->short_form)
+ {
+ print_header(file, print_event_info);
+ fprintf(file, "\tUpdate_rows: table id %lu", m_table_id);
+ print_base64(file, print_event_info);
+ }
+}
+#endif
+
+#endif /* defined(HAVE_ROW_BASED_REPLICATION) */