diff options
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 1914 |
1 files changed, 1860 insertions, 54 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index c8f8ff40700..6e256a0c295 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -21,11 +21,14 @@ #pragma implementation // gcc: Class implementation #endif -#include "mysql_priv.h" +#include "mysql_priv.h" #include "slave.h" #include "rpl_filter.h" #include <my_dir.h> #endif /* MYSQL_CLIENT */ +#include <base64.h> +#include <my_bitmap.h> +#include <my_vle.h> #define log_cs &my_charset_latin1 @@ -232,6 +235,7 @@ char *str_to_hex(char *to, const char *from, uint len) commands just before it prints a query. */ +#ifdef MYSQL_CLIENT static void print_set_option(FILE* file, uint32 bits_changed, uint32 option, uint32 flags, const char* name, bool* need_comma) { @@ -243,6 +247,7 @@ static void print_set_option(FILE* file, uint32 bits_changed, uint32 option, *need_comma= 1; } } +#endif /************************************************************************** Log_event methods (= the parent class of all events) @@ -271,6 +276,10 @@ const char* Log_event::get_type_str() case XID_EVENT: return "Xid"; case USER_VAR_EVENT: return "User var"; case FORMAT_DESCRIPTION_EVENT: return "Format_desc"; + case TABLE_MAP_EVENT: return "Table_map"; + case WRITE_ROWS_EVENT: return "Write_rows"; + case UPDATE_ROWS_EVENT: return "Update_rows"; + case DELETE_ROWS_EVENT: return "Delete_rows"; case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query"; case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query"; default: return "Unknown"; /* impossible */ @@ -778,6 +787,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, DBUG_RETURN(NULL); // general sanity check - will fail on a partial read } + /* To check the integrity of the Log_event_type enumeration */ + DBUG_ASSERT(buf[EVENT_TYPE_OFFSET] < ENUM_END_EVENT); + switch(buf[EVENT_TYPE_OFFSET]) { case QUERY_EVENT: ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT); @@ -829,6 +841,20 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, case FORMAT_DESCRIPTION_EVENT: ev = new Format_description_log_event(buf, event_len, description_event); break; +#if defined(HAVE_REPLICATION) && defined(HAVE_ROW_BASED_REPLICATION) + case WRITE_ROWS_EVENT: + ev = new Write_rows_log_event(buf, event_len, description_event); + break; + case UPDATE_ROWS_EVENT: + ev = new Update_rows_log_event(buf, event_len, description_event); + break; + case DELETE_ROWS_EVENT: + ev = new Delete_rows_log_event(buf, event_len, description_event); + break; + case TABLE_MAP_EVENT: + ev = new Table_map_log_event(buf, event_len, description_event); + break; +#endif case BEGIN_LOAD_QUERY_EVENT: ev = new Begin_load_query_log_event(buf, event_len, description_event); break; @@ -952,6 +978,24 @@ void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) } +void Log_event::print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info) +{ + uchar *ptr= (uchar*)temp_buf; + my_off_t size= uint4korr(ptr + EVENT_LEN_OFFSET); + + char *tmp_str= + (char *) my_malloc(base64_needed_encoded_length(size), MYF(MY_WME)); + if (!tmp_str) { + fprintf(stderr, "\nError: Out of memory. " + "Could not print correct binlog event.\n"); + return; + } + int res= base64_encode(ptr, size, tmp_str); + fprintf(file, "\nBINLOG '\n%s\n';\n", tmp_str); + my_free(tmp_str, MYF(0)); +} + + /* Log_event::print_timestamp() */ @@ -1714,7 +1758,7 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli, const char *query clear_all_errors(thd, rli); /* Can ignore query */ else { - slave_print_error(rli,expected_error, + slave_print_msg(ERROR_LEVEL, rli, expected_error, "\ Query partially completed on the master (error on master: %d) \ and was aborted. There is a chance that your master is inconsistent at this \ @@ -1743,16 +1787,16 @@ compare_errors: !ignored_error_code(actual_error) && !ignored_error_code(expected_error)) { - slave_print_error(rli, 0, - "\ -Query caused different errors on master and slave. \ + slave_print_msg(ERROR_LEVEL, rli, 0, + "\ +Query caused different errors on master and slave. \ Error on master: '%s' (%d), Error on slave: '%s' (%d). \ Default database: '%s'. Query: '%s'", - ER_SAFE(expected_error), - expected_error, - actual_error ? thd->net.last_error: "no error", - actual_error, - print_slave_db_safe(db), query_arg); + ER_SAFE(expected_error), + expected_error, + actual_error ? thd->net.last_error: "no error", + actual_error, + print_slave_db_safe(db), query_arg); thd->query_error= 1; } /* @@ -1769,11 +1813,11 @@ Default database: '%s'. Query: '%s'", */ else if (thd->query_error || thd->is_fatal_error) { - slave_print_error(rli,actual_error, - "Error '%s' on query. Default database: '%s'. Query: '%s'", - (actual_error ? thd->net.last_error : - "unexpected success or fatal error"), - print_slave_db_safe(thd->db), query_arg); + slave_print_msg(ERROR_LEVEL, rli, actual_error, + "Error '%s' on query. Default database: '%s'. Query: '%s'", + (actual_error ? thd->net.last_error : + "unexpected success or fatal error"), + print_slave_db_safe(thd->db), query_arg); thd->query_error= 1; } @@ -2055,6 +2099,25 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN; post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1]; post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN; + post_header_len[TABLE_MAP_EVENT-1]= TABLE_MAP_HEADER_LEN; + post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN; + post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN; + post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN; + /* + We here have the possibility to simulate a master of before we changed + the table map id to be stored in 6 bytes: when it was stored in 4 + bytes (=> post_header_len was 6). This is used to test backward + compatibility. + This code can be removed after a few months (today is Dec 21st 2005), + when we know that the 4-byte masters are not deployed anymore (check + with Tomas Ulin first!), and the accompanying test (rpl_row_4_bytes) + too. + */ + DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", + post_header_len[TABLE_MAP_EVENT-1]= + post_header_len[WRITE_ROWS_EVENT-1]= + post_header_len[UPDATE_ROWS_EVENT-1]= + post_header_len[DELETE_ROWS_EVENT-1]= 6;); post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= post_header_len[APPEND_BLOCK_EVENT-1]; post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN; } @@ -2189,10 +2252,8 @@ int Format_description_log_event::exec_event(struct st_relay_log_info* rli) As a transaction NEVER spans on 2 or more binlogs: if we have an active transaction at this point, the master died while writing the transaction to the binary log, i.e. while - flushing the binlog cache to the binlog. As the write was started, - the transaction had been committed on the master, so we lack of - information to replay this transaction on the slave; all we can do - is stop with error. + flushing the binlog cache to the binlog. XA guarantees that master has + rolled back. So we roll back. Note: this event could be sent by the master to inform us of the format of its binlog; in other words maybe it is not at its original place when it comes to us; we'll know this by checking @@ -2200,11 +2261,13 @@ int Format_description_log_event::exec_event(struct st_relay_log_info* rli) */ if (!artificial_event && created && thd->transaction.all.nht) { - slave_print_error(rli, 0, "Rolling back unfinished transaction (no " - "COMMIT or ROLLBACK) from relay log. A probable cause " - "is that the master died while writing the transaction " - "to its binary log."); - end_trans(thd, ROLLBACK); + /* This is not an error (XA is safe), just an information */ + slave_print_msg(INFORMATION_LEVEL, rli, 0, + "Rolling back unfinished transaction (no COMMIT " + "or ROLLBACK in relay log). A probable cause is that " + "the master died while writing the transaction to " + "its binary log, thus rolled back too."); + rli->cleanup_context(thd, 1); } #endif /* @@ -2751,6 +2814,9 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, thd->query_length= 0; // Should not be needed thd->query_error= 0; clear_all_errors(thd, rli); + + /* see Query_log_event::exec_event() and BUG#13360 */ + DBUG_ASSERT(!rli->m_table_map.count()); /* Usually mysql_init_query() is called by mysql_parse(), but we need it here as the present method does not call mysql_parse(). @@ -2962,9 +3028,9 @@ error: sql_errno=ER_UNKNOWN_ERROR; err=ER(sql_errno); } - slave_print_error(rli,sql_errno,"\ + slave_print_msg(ERROR_LEVEL, rli, sql_errno,"\ Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'", - err, (char*)table_name, print_slave_db_safe(save_db)); + err, (char*)table_name, print_slave_db_safe(save_db)); free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); return 1; } @@ -2972,9 +3038,9 @@ Error '%s' running LOAD DATA INFILE on table '%s'. Default database: '%s'", if (thd->is_fatal_error) { - slave_print_error(rli,ER_UNKNOWN_ERROR, "\ + slave_print_msg(ERROR_LEVEL, rli, ER_UNKNOWN_ERROR, "\ Fatal error running LOAD DATA INFILE on table '%s'. Default database: '%s'", - (char*)table_name, print_slave_db_safe(save_db)); + (char*)table_name, print_slave_db_safe(save_db)); return 1; } @@ -3035,8 +3101,7 @@ void Rotate_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) #ifndef MYSQL_CLIENT -Rotate_log_event::Rotate_log_event(THD* thd_arg, - const char* new_log_ident_arg, +Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg, uint ident_len_arg, ulonglong pos_arg, uint flags_arg) :Log_event(), new_log_ident(new_log_ident_arg), @@ -3045,7 +3110,7 @@ Rotate_log_event::Rotate_log_event(THD* thd_arg, { #ifndef DBUG_OFF char buff[22]; - DBUG_ENTER("Rotate_log_event::Rotate_log_event(THD*,...)"); + DBUG_ENTER("Rotate_log_event::Rotate_log_event(...,flags)"); DBUG_PRINT("enter",("new_log_ident %s pos %s flags %lu", new_log_ident_arg, llstr(pos_arg, buff), flags)); #endif @@ -3353,12 +3418,24 @@ int Rand_log_event::exec_event(struct st_relay_log_info* rli) Xid_log_event methods **************************************************************************/ +#if !defined(DBUG_OFF) && !defined(MYSQL_CLIENT) +/* + This static class member could be removed when mysqltest is made to support + a --replace-regex command: then tests which have XIDs in their output can + use this command to suppress non-deterministic XID values. +*/ +my_bool Xid_log_event::show_xid; +#endif + #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) void Xid_log_event::pack_info(Protocol *protocol) { char buf[128], *pos; pos= strmov(buf, "COMMIT /* xid="); - pos= longlong10_to_str(xid, pos, 10); +#if !defined(DBUG_OFF) && !defined(MYSQL_CLIENT) + if (show_xid) +#endif + pos= longlong10_to_str(xid, pos, 10); pos= strmov(pos, " */"); protocol->store(buf, (uint) (pos-buf), &my_charset_bin); } @@ -4179,7 +4256,8 @@ int Create_file_log_event::exec_event(struct st_relay_log_info* rli) init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0, MYF(MY_WME|MY_NABP))) { - slave_print_error(rli,my_errno, "Error in Create_file event: could not open file '%s'", fname_buf); + slave_print_msg(ERROR_LEVEL, rli, my_errno, "Error in Create_file event: " + "could not open file '%s'", fname_buf); goto err; } @@ -4190,9 +4268,9 @@ int Create_file_log_event::exec_event(struct st_relay_log_info* rli) if (write_base(&file)) { strmov(p, ".info"); // to have it right in the error message - slave_print_error(rli,my_errno, - "Error in Create_file event: could not write to file '%s'", - fname_buf); + slave_print_msg(ERROR_LEVEL, rli, my_errno, + "Error in Create_file event: could not write to file '%s'", + fname_buf); goto err; } end_io_cache(&file); @@ -4204,12 +4282,14 @@ int Create_file_log_event::exec_event(struct st_relay_log_info* rli) O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW, MYF(MY_WME))) < 0) { - slave_print_error(rli,my_errno, "Error in Create_file event: could not open file '%s'", fname_buf); + slave_print_msg(ERROR_LEVEL, rli, my_errno, "Error in Create_file event: " + "could not open file '%s'", fname_buf); goto err; } if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP))) { - slave_print_error(rli,my_errno, "Error in Create_file event: write to '%s' failed", fname_buf); + slave_print_msg(ERROR_LEVEL, rli, my_errno, "Error in Create_file event: " + "write to '%s' failed", fname_buf); goto err; } error=0; // Everything is ok @@ -4348,25 +4428,25 @@ int Append_block_log_event::exec_event(struct st_relay_log_info* rli) O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW, MYF(MY_WME))) < 0) { - slave_print_error(rli, my_errno, - "Error in %s event: could not create file '%s'", - get_type_str(), fname); + slave_print_msg(ERROR_LEVEL, rli, my_errno, + "Error in %s event: could not create file '%s'", + get_type_str(), fname); goto err; } } else if ((fd = my_open(fname, O_WRONLY | O_APPEND | O_BINARY | O_NOFOLLOW, MYF(MY_WME))) < 0) { - slave_print_error(rli, my_errno, - "Error in %s event: could not open file '%s'", - get_type_str(), fname); + slave_print_msg(ERROR_LEVEL, rli, my_errno, + "Error in %s event: could not open file '%s'", + get_type_str(), fname); goto err; } if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP))) { - slave_print_error(rli, my_errno, - "Error in %s event: write to '%s' failed", - get_type_str(), fname); + slave_print_msg(ERROR_LEVEL, rli, my_errno, + "Error in %s event: write to '%s' failed", + get_type_str(), fname); goto err; } error=0; @@ -4573,7 +4653,8 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0, MYF(MY_WME|MY_NABP))) { - slave_print_error(rli,my_errno, "Error in Exec_load event: could not open file '%s'", fname); + slave_print_msg(ERROR_LEVEL, rli, my_errno, "Error in Exec_load event: " + "could not open file '%s'", fname); goto err; } if (!(lev = (Load_log_event*)Log_event::read_log_event(&file, @@ -4581,7 +4662,8 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) rli->relay_log.description_event_for_exec)) || lev->get_type_code() != NEW_LOAD_EVENT) { - slave_print_error(rli,0, "Error in Exec_load event: file '%s' appears corrupted", fname); + slave_print_msg(ERROR_LEVEL, rli, 0, "Error in Exec_load event: " + "file '%s' appears corrupted", fname); goto err; } @@ -4607,10 +4689,10 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) char *tmp= my_strdup(rli->last_slave_error,MYF(MY_WME)); if (tmp) { - slave_print_error(rli, - rli->last_slave_errno, /* ok to re-use error code */ - "%s. Failed executing load from '%s'", - tmp, fname); + slave_print_msg(ERROR_LEVEL, rli, + rli->last_slave_errno, /* ok to re-use error code */ + "%s. Failed executing load from '%s'", + tmp, fname); my_free(tmp,MYF(0)); } goto err; @@ -4816,7 +4898,7 @@ Execute_load_query_log_event::exec_event(struct st_relay_log_info* rli) if (!(buf = my_malloc(q_len + 1 - (fn_pos_end - fn_pos_start) + (FN_REFLEN + 10) + 10 + 8 + 5, MYF(MY_WME)))) { - slave_print_error(rli, my_errno, "Not enough memory"); + slave_print_msg(ERROR_LEVEL, rli, my_errno, "Not enough memory"); return 1; } @@ -4942,3 +5024,1727 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format) } return buf; } + + +#ifdef HAVE_ROW_BASED_REPLICATION + +/************************************************************************** + Rows_log_event member functions +**************************************************************************/ + +#ifndef MYSQL_CLIENT +Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, + MY_BITMAP const *cols, bool is_transactional) + : Log_event(thd_arg, 0, is_transactional), + m_table(tbl_arg), + m_table_id(tid), + m_width(tbl_arg->s->fields), + m_rows_buf(my_malloc(opt_binlog_rows_event_max_size * sizeof(*m_rows_buf), MYF(MY_WME))), + m_rows_cur(m_rows_buf), + m_rows_end(m_rows_buf + opt_binlog_rows_event_max_size), + m_flags(0) +{ + DBUG_ASSERT(m_table && m_table->s); + DBUG_ASSERT(m_table_id != ULONG_MAX); + + if (thd_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS) + set_flags(NO_FOREIGN_KEY_CHECKS_F); + if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS) + set_flags(RELAXED_UNIQUE_CHECKS_F); + /* if bitmap_init fails, catched in is_valid() */ + if (likely(!bitmap_init(&m_cols, + m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL, + (m_width + 7) & ~7UL, + false))) + memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols)); + else + m_cols.bitmap= 0; // to not free it +} +#endif + +Rows_log_event::Rows_log_event(const char *buf, uint event_len, + Log_event_type event_type, + const Format_description_log_event + *description_event) + : Log_event(buf, description_event), + m_rows_buf(0), m_rows_cur(0), m_rows_end(0) +{ + DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)"); + uint8 const common_header_len= description_event->common_header_len; + uint8 const post_header_len= description_event->post_header_len[event_type-1]; + + DBUG_PRINT("enter",("event_len=%ld, common_header_len=%d, " + "post_header_len=%d", + event_len, common_header_len, + post_header_len)); + + const char *post_start= buf + common_header_len; + post_start+= RW_MAPID_OFFSET; + if (post_header_len == 6) + { + /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */ + m_table_id= uint4korr(post_start); + post_start+= 4; + } + else + { + m_table_id= uint6korr(post_start); + post_start+= RW_FLAGS_OFFSET; + } + + DBUG_ASSERT(m_table_id != ULONG_MAX); + + m_flags= uint2korr(post_start); + + byte const *const var_start= buf + common_header_len + post_header_len; + byte const *const ptr_width= var_start; + byte const *const ptr_after_width= my_vle_decode(&m_width, ptr_width); + + const uint byte_count= (m_width + 7) / 8; + const char* const ptr_rows_data= var_start + byte_count + 1; + + my_size_t const data_size= event_len - (ptr_rows_data - buf); + DBUG_PRINT("info",("m_table_id=%lu, m_flags=%d, m_width=%u, data_size=%lu", + m_table_id, m_flags, m_width, data_size)); + + m_rows_buf= my_malloc(data_size, MYF(MY_WME)); + if (likely((bool)m_rows_buf)) + { + /* if bitmap_init fails, catched in is_valid() */ + if (likely(!bitmap_init(&m_cols, + m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL, + (m_width + 7) & ~7UL, + false))) + memcpy(m_cols.bitmap, ptr_after_width, byte_count); + m_rows_end= m_rows_buf + data_size; + m_rows_cur= m_rows_end; + memcpy(m_rows_buf, ptr_rows_data, data_size); + } + else + m_cols.bitmap= 0; // to not free it + + DBUG_VOID_RETURN; +} + +Rows_log_event::~Rows_log_event() +{ + if (m_cols.bitmap == m_bitbuf) // no my_malloc happened + m_cols.bitmap= 0; // so no my_free in bitmap_free + bitmap_free(&m_cols); // To pair with bitmap_init(). + my_free(m_rows_buf, MYF(MY_ALLOW_ZERO_PTR)); +} + +#ifndef MYSQL_CLIENT +int Rows_log_event::do_add_row_data(byte *const row_data, + my_size_t const length) +{ + /* + When the table has a primary key, we would probably want, by default, to + log only the primary key value instead of the entire "before image". This + would save binlog space. TODO + */ + DBUG_ENTER("Rows_log_event::do_add_row_data(byte *data, my_size_t length)"); + DBUG_PRINT("enter", ("row_data= %p, length= %lu", row_data, length)); + DBUG_DUMP("row_data", row_data, min(length, 32)); + + DBUG_ASSERT(m_rows_buf <= m_rows_cur); + DBUG_ASSERT(m_rows_buf < m_rows_end); + DBUG_ASSERT(m_rows_cur <= m_rows_end); + + /* The cast will always work since m_rows_cur <= m_rows_end */ + if (static_cast<my_size_t>(m_rows_end - m_rows_cur) < length) + { + my_size_t const block_size= 1024; + my_ptrdiff_t const old_alloc= m_rows_end - m_rows_buf; + my_ptrdiff_t const new_alloc= + old_alloc + block_size * (length / block_size + block_size - 1); + my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf; + + byte* const new_buf= my_realloc(m_rows_buf, new_alloc, MYF(MY_WME)); + if (unlikely(!new_buf)) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + /* If the memory moved, we need to move the pointers */ + if (new_buf != m_rows_buf) + { + m_rows_buf= new_buf; + m_rows_cur= m_rows_buf + cur_size; + } + + /* + The end pointer should always be changed to point to the end of + the allocated memory. + */ + m_rows_end= m_rows_buf + new_alloc; + } + + DBUG_ASSERT(m_rows_cur + length < m_rows_end); + memcpy(m_rows_cur, row_data, length); + m_rows_cur+= length; + DBUG_RETURN(0); +} +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +/* + Unpack a row into a record. The row is assumed to only consist of the fields + for which the bitset represented by 'arr' and 'bits'; the other parts of the + record are left alone. + */ +static char const *unpack_row(TABLE *table, + char *record, char const *row, + MY_BITMAP const *cols) +{ + DBUG_ASSERT(record && row); + + MY_BITMAP *write_set= table->file->write_set; + my_size_t const n_null_bytes= table->s->null_bytes; + my_ptrdiff_t const offset= record - (byte*) table->record[0]; + + memcpy(record, row, n_null_bytes); + char const *ptr= row + n_null_bytes; + + bitmap_set_all(write_set); + Field **const begin_ptr = table->field; + for (Field **field_ptr= begin_ptr ; *field_ptr ; ++field_ptr) + { + Field *const f= *field_ptr; + + if (bitmap_is_set(cols, field_ptr - begin_ptr)) + { + /* Field...::unpack() cannot return 0 */ + ptr= f->unpack(f->ptr + offset, ptr); + } + else + bitmap_clear_bit(write_set, (field_ptr - begin_ptr) + 1); + } + return ptr; +} + +int Rows_log_event::exec_event(st_relay_log_info *rli) +{ + DBUG_ENTER("Rows_log_event::exec_event(st_relay_log_info*)"); + DBUG_ASSERT(m_table_id != ULONG_MAX); + int error= 0; + char const *row_start= m_rows_buf; + TABLE* table= rli->m_table_map.get_table(m_table_id); + + /* + 'thd' has been set by exec_relay_log_event(), just before calling + exec_event(). We still check here to prevent future coding errors. + */ + DBUG_ASSERT(rli->sql_thd == thd); + + /* + lock_tables() reads the contents of thd->lex, so they must be + initialized, so we should call lex_start(); to be even safer, we call + mysql_init_query() which does a more complete set of inits. + */ + mysql_init_query(thd, NULL, 0); + + if (table) + { + /* + table == NULL means that this table should not be + replicated (this was set up by Table_map_log_event::exec_event() which + tested replicate-* rules). + */ + TABLE_LIST table_list; + bool need_reopen; + uint count= 1; + bzero(&table_list, sizeof(table_list)); + table_list.lock_type= TL_WRITE; + table_list.next_global= table_list.next_local= 0; + table_list.table= table; + + for ( ; ; ) + { + table_list.db= const_cast<char*>(table->s->db.str); + table_list.alias= table_list.table_name= + const_cast<char*>(table->s->table_name.str); + + if ((error= lock_tables(thd, &table_list, count, &need_reopen)) == 0) + break; + if (!need_reopen) + { + slave_print_msg(ERROR_LEVEL, rli, error, + "Error in %s event: error during table %s.%s lock", + get_type_str(), table->s->db, table->s->table_name); + DBUG_RETURN(error); + } + /* + we need to store a local copy of the table names since the table object + will become invalid after close_tables_for_reopen + */ + char *db= my_strdup(table->s->db.str, MYF(MY_WME)); + char *table_name= my_strdup(table->s->table_name.str, MYF(MY_WME)); + + if (db == 0 || table_name == 0) + { + /* + Since the lock_tables() failed, the table is not locked, so + we don't need to unlock them. + */ + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + } + + /* + We also needs to flush the pending RBR event, since it keeps a + pointer to an open table. + + ALTERNATIVE SOLUTION: Extract a pointer to the pending RBR + event and reset the table pointer after the tables has been + reopened. + */ + thd->binlog_flush_pending_rows_event(false); + + close_tables_for_reopen(thd, &table_list); + + /* open the table again, same as in Table_map_event::exec_event */ + table_list.db= const_cast<char*>(db); + table_list.alias= table_list.table_name= const_cast<char*>(table_name); + table_list.updating= 1; + TABLE_LIST *tables= &table_list; + if ((error= open_tables(thd, &tables, &count, 0)) == 0) + { + /* reset some variables for the table list*/ + table_list.updating= 0; + /* retrieve the new table reference and update the table map */ + table= table_list.table; + error= rli->m_table_map.set_table(m_table_id, table); + } + else /* error in open_tables */ + { + if (thd->query_error || thd->is_fatal_error) + { + /* + Error reporting borrowed from Query_log_event with many excessive + simplifications (we don't honour --slave-skip-errors) + */ + uint actual_error= thd->net.last_errno; + slave_print_msg(ERROR_LEVEL, rli, actual_error, + "Error '%s' on reopening table `%s`.`%s`", + (actual_error ? thd->net.last_error : + "unexpected success or fatal error"), + db, table_name); + thd->query_error= 1; + } + } + my_free((char*) db, MYF(MY_ALLOW_ZERO_PTR)); + my_free((char*) table_name, MYF(MY_ALLOW_ZERO_PTR)); + + if (error) + DBUG_RETURN(error); + } + + /* + It's not needed to set_time() but + 1) it continues the property that "Time" in SHOW PROCESSLIST shows how + much slave is behind + 2) it will be needed when we allow replication from a table with no + TIMESTAMP column to a table with one. + So we call set_time(), like in SBR. Presently it changes nothing. + */ + thd->set_time((time_t)when); + /* + There are a few flags that are replicated with each row event. + Make sure to set/clear them before executing the main body of + the event. + */ + if (get_flags(NO_FOREIGN_KEY_CHECKS_F)) + thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS; + else + thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS; + + if (get_flags(RELAXED_UNIQUE_CHECKS_F)) + thd->options|= OPTION_RELAXED_UNIQUE_CHECKS; + else + thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS; + /* A small test to verify that objects have consistent types */ + DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS)); + + error= do_before_row_operations(table); + while (error == 0 && row_start < m_rows_end) { + char const *row_end= do_prepare_row(thd, table, row_start); + DBUG_ASSERT(row_end != NULL); // cannot happen + DBUG_ASSERT(row_end <= m_rows_end); + + /* in_use can have been set to NULL in close_tables_for_reopen */ + THD* old_thd= table->in_use; + if (!table->in_use) + table->in_use= thd; + error= do_exec_row(table); + table->in_use = old_thd; + switch (error) + { + /* Some recoverable errors */ + case HA_ERR_RECORD_CHANGED: + case HA_ERR_KEY_NOT_FOUND: /* Idempotency support: OK if + tuple does not exist */ + error= 0; + case 0: + break; + + default: + slave_print_msg(ERROR_LEVEL, rli, error, + "Error in %s event: row application failed", + get_type_str()); + thd->query_error= 1; + break; + } + + row_start= row_end; + } + DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event", + rli->abort_slave=1;); + error= do_after_row_operations(table, error); + if (!cache_stmt) + thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; + + } + + if (error) + { /* error has occured during the transaction */ + /* + If one day we honour --skip-slave-errors in row-based replication, and + the error should be skipped, then we would clear mappings, rollback, + close tables, but the slave SQL thread would not stop and then may + assume the mapping is still available, the tables are still open... + So then we should clear mappings/rollback/close here only if this is a + STMT_END_F. + For now we code, knowing that error is not skippable and so slave SQL + thread is certainly going to stop. + */ + rli->cleanup_context(thd, 1); + thd->query_error= 1; + DBUG_RETURN(error); + } + + if (get_flags(STMT_END_F)) + { + /* + This is the end of a statement or transaction, so close (and + unlock) the tables we opened when processing the + Table_map_log_event starting the statement. + + OBSERVER. This will clear *all* mappings, not only those that + are open for the table. There is not good handle for on-close + actions for tables. + + NOTE. Even if we have no table ('table' == 0) we still need to be + here, so that we increase the group relay log position. If we didn't, we + could have a group relay log position which lags behind "forever" + (assume the last master's transaction is ignored by the slave because of + replicate-ignore rules). + */ + thd->binlog_flush_pending_rows_event(true); + /* + If this event is not in a transaction, the call below will, if some + transactional storage engines are involved, commit the statement into + them and flush the pending event to binlog. + If this event is in a transaction, the call will do nothing, but a + Xid_log_event will come next which will, if some transactional engines + are involved, commit the transaction and flush the pending event to the + binlog. + */ + error= ha_autocommit_or_rollback(thd, 0); + /* + Now what if this is not a transactional engine? we still need to + flush the pending event to the binlog; we did it with + thd->binlog_flush_pending_rows_event(). Note that we imitate + what is done for real queries: a call to + ha_autocommit_or_rollback() (sometimes only if involves a + transactional engine), and a call to be sure to have the pending + event flushed. + */ + + rli->cleanup_context(thd, 0); + rli->transaction_end(thd); + + if (error == 0) + { + /* + Clear any errors pushed in thd->net.last_err* if for example "no key + found" (as this is allowed). This is a safety measure; apparently + those errors (e.g. when executing a Delete_rows_log_event of a + non-existing row, like in rpl_row_mystery22.test, + thd->net.last_error = "Can't find record in 't1'" and last_errno=1032) + do not become visible. We still prefer to wipe them out. + */ + thd->clear_error(); + error= Log_event::exec_event(rli); + } + else + slave_print_msg(ERROR_LEVEL, rli, error, + "Error in %s event: commit of row events failed, " + "table `%s`.`%s`", + get_type_str(), table->s->db, table->s->table_name); + DBUG_RETURN(error); + } + + if (table) + { + /* + As "table" is not NULL, we did a successful lock_tables(), without any + prior LOCK TABLES and are not in prelocked mode, so this assertion should + be true. + */ + DBUG_ASSERT(thd->lock); + /* + If we are here, there are more events to come which may use our mappings + and our table. So don't clear mappings or close tables, just unlock + tables. + Why don't we lock the table once for all in + Table_map_log_event::exec_event() ? Because we could have in binlog: + BEGIN; + Table_map t1 -> 1 + Write_rows to id 1 + Table_map t2 -> 2 + Write_rows to id 2 + Xid_log_event + So we cannot lock t1 when executing the first Table_map, because at that + moment we don't know we'll also have to lock t2, and all tables must be + locked at once in MySQL. + */ + mysql_unlock_tables(thd, thd->lock); + thd->lock= 0; + if ((table->s->primary_key == MAX_KEY) && + !cache_stmt) + { + /* + ------------ Temporary fix until WL#2975 is implemented --------- + This event is not the last one (no STMT_END_F). If we stop now (in + case of terminate_slave_thread()), how will we restart? We have to + restart from Table_map_log_event, but as this table is not + transactional, the rows already inserted will still be present, and + idempotency is not guaranteed (no PK) so we risk that repeating leads + to double insert. So we desperately try to continue, hope we'll + eventually leave this buggy situation (by executing the final + Rows_log_event). If we are in a hopeless wait (reached end of last + relay log and nothing gets appended there), we timeout after one + minute, and notify DBA about the problem. + When WL#2975 is implemented, just remove the member + st_relay_log_info::unsafe_to_stop_at and all its occurences. + */ + rli->unsafe_to_stop_at= time(0); + } + } + + DBUG_ASSERT(error == 0); + thd->clear_error(); + rli->inc_event_relay_log_pos(); + + DBUG_RETURN(0); +} +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ + +#ifndef MYSQL_CLIENT +bool Rows_log_event::write_data_header(IO_CACHE *file) +{ + DBUG_ASSERT(m_table_id != ULONG_MAX); + byte buf[ROWS_HEADER_LEN]; // No need to init the buffer + DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", + { + int4store(buf + 0, m_table_id); + int2store(buf + 4, m_flags); + return (my_b_safe_write(file, buf, 6)); + }); + int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id); + int2store(buf + RW_FLAGS_OFFSET, m_flags); + return (my_b_safe_write(file, buf, ROWS_HEADER_LEN)); +} + +bool Rows_log_event::write_data_body(IO_CACHE*file) +{ + /* + Note that this should be the number of *bits*, not the number of + bytes. + */ + byte sbuf[my_vle_sizeof(m_width)]; + my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf; + + char *const sbuf_end= my_vle_encode(sbuf, sizeof(sbuf), m_width); + DBUG_ASSERT(static_cast<my_size_t>(sbuf_end - sbuf) <= sizeof(sbuf)); + + return (my_b_safe_write(file, sbuf, sbuf_end - sbuf) || + my_b_safe_write(file, reinterpret_cast<byte*>(m_cols.bitmap), + no_bytes_in_map(&m_cols)) || + my_b_safe_write(file, m_rows_buf, data_size)); +} +#endif + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) && defined(DBUG_RBR) +void Rows_log_event::pack_info(Protocol *protocol) +{ + char buf[256]; + char const *const flagstr= get_flags(STMT_END_F) ? "STMT_END_F" : ""; + char const *const dbnam= m_table->s->db; + char const *const tblnam= m_table->s->table_name; + my_size_t bytes= snprintf(buf, sizeof(buf), + "%s.%s - %s", dbnam, tblnam, flagstr); + protocol->store(buf, bytes, &my_charset_bin); +} +#endif + +/************************************************************************** + Table_map_log_event member functions +**************************************************************************/ + +/* + Constructor used to build an event for writing to the binary log. + Mats says tbl->s lives longer than this event so it's ok to copy pointers + (tbl->s->db etc) and not pointer content. + */ +#if !defined(MYSQL_CLIENT) +Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, + bool is_transactional, uint16 flags) + : Log_event(thd, 0, is_transactional), + m_table(tbl), + m_dbnam(tbl->s->db.str), + m_dblen(m_dbnam ? tbl->s->db.length : 0), + m_tblnam(tbl->s->table_name.str), + m_tbllen(tbl->s->table_name.length), + m_colcnt(tbl->s->fields), m_coltype(0), + m_table_id(tid), + m_flags(flags) +{ + DBUG_ASSERT(m_table_id != ULONG_MAX); + /* + In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in + table.cc / alloc_table_share(): + Use the fact the key is db/0/table_name/0 + As we rely on this let's assert it. + */ + DBUG_ASSERT((tbl->s->db.str == 0) || + (tbl->s->db.str[tbl->s->db.length] == 0)); + DBUG_ASSERT(tbl->s->table_name.str[tbl->s->table_name.length] == 0); + + + m_data_size= TABLE_MAP_HEADER_LEN; + DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", m_data_size= 6;) + m_data_size+= m_dblen + 2; // Include length and terminating \0 + m_data_size+= m_tbllen + 2; // Include length and terminating \0 + m_data_size+= 1 + m_colcnt; // COLCNT and column types + + /* If malloc fails, catched in is_valid() */ + if ((m_memory= my_malloc(m_colcnt, MYF(MY_WME)))) + { + m_coltype= reinterpret_cast<unsigned char*>(m_memory); + for (unsigned int i= 0 ; i < m_table->s->fields ; ++i) + m_coltype[i]= m_table->field[i]->type(); + } +} +#endif /* !defined(MYSQL_CLIENT) */ + +/* + Constructor used by slave to read the event from the binary log. + */ +#if defined(HAVE_REPLICATION) +Table_map_log_event::Table_map_log_event(const char *buf, uint event_len, + const Format_description_log_event + *description_event) + + : Log_event(buf, description_event), +#ifndef MYSQL_CLIENT + m_table(NULL), +#endif + m_memory(NULL) +{ + DBUG_ENTER("Table_map_log_event::Table_map_log_event(const char*,uint,...)"); + + uint8 common_header_len= description_event->common_header_len; + uint8 post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1]; + DBUG_PRINT("info",("event_len=%ld, common_header_len=%d, post_header_len=%d", + event_len, common_header_len, post_header_len)); + + DBUG_DUMP("event buffer", buf, event_len); + + /* Read the post-header */ + const char *post_start= buf + common_header_len; + + post_start+= TM_MAPID_OFFSET; + if (post_header_len == 6) + { + /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */ + m_table_id= uint4korr(post_start); + post_start+= 4; + } + else + { + DBUG_ASSERT(post_header_len == TABLE_MAP_HEADER_LEN); + m_table_id= uint6korr(post_start); + post_start+= TM_FLAGS_OFFSET; + } + + DBUG_ASSERT(m_table_id != ULONG_MAX); + + m_flags= uint2korr(post_start); + + /* Read the variable part of the event */ + const char *const vpart= buf + common_header_len + post_header_len; + + /* Extract the length of the various parts from the buffer */ + byte const* const ptr_dblen= vpart + 0; + m_dblen= *(unsigned char*) ptr_dblen; + + /* Length of database name + counter + terminating null */ + byte const* const ptr_tbllen= ptr_dblen + m_dblen + 2; + m_tbllen= *(unsigned char*) ptr_tbllen; + + /* Length of table name + counter + terminating null */ + byte const* const ptr_colcnt= ptr_tbllen + m_tbllen + 2; + byte const* const ptr_after_colcnt= my_vle_decode(&m_colcnt, ptr_colcnt); + + DBUG_PRINT("info",("m_dblen=%d off=%d m_tbllen=%d off=%d m_colcnt=%d off=%d", + m_dblen, ptr_dblen-vpart, m_tbllen, ptr_tbllen-vpart, + m_colcnt, ptr_colcnt-vpart)); + + /* Allocate mem for all fields in one go. If fails, catched in is_valid() */ + m_memory= my_multi_malloc(MYF(MY_WME), + &m_dbnam, m_dblen + 1, + &m_tblnam, m_tbllen + 1, + &m_coltype, m_colcnt, + NULL); + + if (m_memory) + { + /* Copy the different parts into their memory */ + strncpy(const_cast<char*>(m_dbnam), ptr_dblen + 1, m_dblen + 1); + strncpy(const_cast<char*>(m_tblnam), ptr_tbllen + 1, m_tbllen + 1); + memcpy(m_coltype, ptr_after_colcnt, m_colcnt); + } + + DBUG_VOID_RETURN; +} +#endif + +Table_map_log_event::~Table_map_log_event() +{ + my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); +} + +/* + Find a table based on database name and table name. + + DESCRIPTION + + Currently, only the first table of the 'table_list' is located. If the + table is found in the list of open tables for the thread, the 'table' + field of 'table_list' is filled in. + + PARAMETERS + + thd Thread structure + table_list List of tables to locate in the thd->open_tables list. + count Pointer to a variable that will be set to the number of + tables found. If the pointer is NULL, nothing will be stored. + + RETURN VALUE + + The number of tables found. + + TO DO + + Replace the list of table searches with a hash based on the combined + database and table name. The handler_tables_hash is inappropriate since + it hashes on the table alias. At the same time, the function can be + extended to handle a full list of table names, in the same spirit as + open_tables() and lock_tables(). +*/ +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +static uint find_tables(THD *thd, TABLE_LIST *table_list, uint *count) +{ + uint result= 0; + + /* we verify that the caller knows our limitation */ + DBUG_ASSERT(table_list->next_global == 0); + for (TABLE *table= thd->open_tables; table ; table= table->next) + { + if (strcmp(table->s->db.str, table_list->db) == 0 + && strcmp(table->s->table_name.str, table_list->table_name) == 0) + { + /* Copy the table pointer into the table list. */ + table_list->table= table; + result= 1; + break; + } + } + + if (count) + *count= result; + return result; +} +#endif + +/* + Return value is an error code, one of: + + -1 Failure to open table [from open_tables()] + 0 Success + 1 No room for more tables [from set_table()] + 2 Out of memory [from set_table()] + 3 Wrong table definition + 4 Daisy-chaining RBR with SBR not possible + */ + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +int Table_map_log_event::exec_event(st_relay_log_info *rli) +{ + DBUG_ENTER("Table_map_log_event::exec_event(st_relay_log_info*)"); + + DBUG_ASSERT(rli->sql_thd == thd); + + /* Step the query id to mark what columns that are actually used. */ + pthread_mutex_lock(&LOCK_thread_count); + thd->query_id= next_query_id(); + pthread_mutex_unlock(&LOCK_thread_count); + + TABLE_LIST table_list; + uint32 dummy_len; + bzero(&table_list, sizeof(table_list)); + table_list.db= const_cast<char *> + (rpl_filter->get_rewrite_db(m_dbnam, &dummy_len)); + table_list.alias= table_list.table_name= const_cast<char*>(m_tblnam); + table_list.lock_type= TL_WRITE; + table_list.next_global= table_list.next_local= 0; + table_list.updating= 1; + + int error= 0; + + if (rpl_filter->db_ok(table_list.db) && + (!rpl_filter->is_on() || rpl_filter->tables_ok("", &table_list))) + { + /* + Check if the slave is set to use SBR. If so, the slave should + stop immediately since it is not possible to daisy-chain from + RBR to SBR. Once RBR is used, the rest of the chain has to use + RBR. + */ + if (mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG) && + !binlog_row_based) + { + slave_print_msg(ERROR_LEVEL, rli, ER_BINLOG_ROW_RBR_TO_SBR, + "It is not possible to use statement-based binlogging " + "on a slave that replicates row-based. Please use " + "--binrow-format=row on slave if you want to use " + "--log-slave-updates and read row-based binlog events."); + DBUG_RETURN(ERR_RBR_TO_SBR); + } + + /* + Open the table if it is not already open and add the table to table map. + If the table should not be replicated, we don't bother to do anything. + The table map will return NULL and the row-level event will effectively + be a no-op. + */ + uint count; + if (find_tables(thd, &table_list, &count) == 0) + { + /* + open_tables() reads the contents of thd->lex, so they must be + initialized, so we should call lex_start(); to be even safer, we call + mysql_init_query() which does a more complete set of inits. + */ + mysql_init_query(thd, NULL, 0); + TABLE_LIST *tables= &table_list; + if ((error= open_tables(thd, &tables, &count, 0))) + { + if (thd->query_error || thd->is_fatal_error) + { + /* + Error reporting borrowed from Query_log_event with many excessive + simplifications (we don't honour --slave-skip-errors) + */ + uint actual_error= thd->net.last_errno; + slave_print_msg(ERROR_LEVEL, rli, actual_error, + "Error '%s' on opening table `%s`.`%s`", + (actual_error ? thd->net.last_error : + "unexpected success or fatal error"), + table_list.db, table_list.table_name); + thd->query_error= 1; + } + DBUG_RETURN(error); + } + } + + m_table= table_list.table; + + /* + This will fail later otherwise, the 'in_use' field should be + set to the current thread. + */ + DBUG_ASSERT(m_table->in_use); + + /* + Check that the number of columns and the field types in the + event match the number of columns and field types in the opened + table. + */ + uint col= m_table->s->fields; + + if (col == m_colcnt) + { + while (col-- > 0) + if (m_table->field[col]->type() != m_coltype[col]) + break; + } + + TABLE_SHARE const *const tsh= m_table->s; + + /* + Check the following termination conditions: + + (col == m_table->s->fields) + ==> (m_table->s->fields != m_colcnt) + (0 <= col < m_table->s->fields) + ==> (m_table->field[col]->type() != m_coltype[col]) + + Logically, A ==> B is equivalent to !A || B + + Since col is unsigned, is suffices to check that col <= + tsh->fields. If col wrapped (by decreasing col when it is 0), + the number will be UINT_MAX, which is greater than tsh->fields. + */ + DBUG_ASSERT(!(col == tsh->fields) || tsh->fields != m_colcnt); + DBUG_ASSERT(!(col < tsh->fields) || + (m_table->field[col]->type() != m_coltype[col])); + + if (col <= tsh->fields) + { + /* + If we get here, the number of columns in the event didn't + match the number of columns in the table on the slave, *or* + there were a column in the table on the slave that did not + have the same type as given in the event. + + If 'col' has the value that was assigned to it, it was a + mismatch between the number of columns on the master and the + slave. + */ + if (col == tsh->fields) + { + DBUG_ASSERT(tsh->db.str && tsh->table_name.str); + slave_print_msg(ERROR_LEVEL, rli, ER_BINLOG_ROW_WRONG_TABLE_DEF, + "Table width mismatch - " + "received %u columns, %s.%s has %u columns", + m_colcnt, tsh->db.str, tsh->table_name.str, tsh->fields); + } + else + { + DBUG_ASSERT(col < m_colcnt && col < tsh->fields); + DBUG_ASSERT(tsh->db.str && tsh->table_name.str); + slave_print_msg(ERROR_LEVEL, rli, ER_BINLOG_ROW_WRONG_TABLE_DEF, + "Column %d type mismatch - " + "received type %d, %s.%s has type %d", + col, m_coltype[col], tsh->db.str, tsh->table_name.str, + m_table->field[col]->type()); + } + + thd->query_error= 1; + DBUG_RETURN(ERR_BAD_TABLE_DEF); + } + + /* + We record in the slave's information that the number m_table_id is + mapped to the m_table object + */ + if (!error) + error= rli->m_table_map.set_table(m_table_id, m_table); + + /* + Tell the RLI that we are touching a table. + + TODO: Maybe we can combine this with the previous operation? + */ + if (!error) + rli->touching_table(m_dbnam, m_tblnam, m_table_id); + } + + /* + We explicitly do not call Log_event::exec_event() here since we do not + want the relay log position to be flushed to disk. The flushing will be + done by the last Rows_log_event that either ends a statement (outside a + transaction) or a transaction. + + A table map event can *never* end a transaction or a statement, so we + just step the relay log position. + */ + + if (likely(!error)) + rli->inc_event_relay_log_pos(); + + DBUG_RETURN(error); +} +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ + +#ifndef MYSQL_CLIENT +bool Table_map_log_event::write_data_header(IO_CACHE *file) +{ + DBUG_ASSERT(m_table_id != ULONG_MAX); + byte buf[TABLE_MAP_HEADER_LEN]; + DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", + { + int4store(buf + 0, m_table_id); + int2store(buf + 4, m_flags); + return (my_b_safe_write(file, buf, 6)); + }); + int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id); + int2store(buf + TM_FLAGS_OFFSET, m_flags); + return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN)); +} + +bool Table_map_log_event::write_data_body(IO_CACHE *file) +{ + DBUG_ASSERT(m_dbnam != NULL); + DBUG_ASSERT(m_tblnam != NULL); + /* We use only one byte per length for storage in event: */ + DBUG_ASSERT(m_dblen < 128); + DBUG_ASSERT(m_tbllen < 128); + + byte const dbuf[]= { m_dblen }; + byte const tbuf[]= { m_tbllen }; + + byte cbuf[my_vle_sizeof(m_colcnt)]; + byte *const cbuf_end= my_vle_encode(cbuf, sizeof(cbuf), m_colcnt); + DBUG_ASSERT(static_cast<my_size_t>(cbuf_end - cbuf) <= sizeof(cbuf)); + + return (my_b_safe_write(file, dbuf, sizeof(dbuf)) || + my_b_safe_write(file, m_dbnam, m_dblen+1) || + my_b_safe_write(file, tbuf, sizeof(tbuf)) || + my_b_safe_write(file, m_tblnam, m_tbllen+1) || + my_b_safe_write(file, cbuf, cbuf_end - cbuf) || + my_b_safe_write(file, reinterpret_cast<char*>(m_coltype), m_colcnt)); + } +#endif + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) + +/* + Print some useful information for the SHOW BINARY LOG information + field. + */ + +void Table_map_log_event::pack_info(Protocol *protocol) +{ + char buf[256]; + my_size_t bytes= snprintf(buf, sizeof(buf), "%s.%s", m_dbnam, m_tblnam); + protocol->store(buf, bytes, &my_charset_bin); +} + +#endif + + +#ifdef MYSQL_CLIENT +void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) +{ + if (!print_event_info->short_form) + { + print_header(file, print_event_info); + fprintf(file, "\tTable_map: `%s`.`%s` mapped to number %lu\n", + m_dbnam, m_tblnam, m_table_id); + print_base64(file, print_event_info); + } +} +#endif + +/************************************************************************** + Write_rows_log_event member functions +**************************************************************************/ + +/* + Constructor used to build an event for writing to the binary log. + */ +#if !defined(MYSQL_CLIENT) +Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg, + ulong tid_arg, + MY_BITMAP const *cols, + bool is_transactional) + : Rows_log_event(thd_arg, tbl_arg, tid_arg, cols, is_transactional) +{ +} +#endif + +/* + Constructor used by slave to read the event from the binary log. + */ +#ifdef HAVE_REPLICATION +Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len, + const Format_description_log_event + *description_event) +: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event) +{ +} +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +int Write_rows_log_event::do_before_row_operations(TABLE *table) +{ + int error= 0; + + /* + We are using REPLACE semantics and not INSERT IGNORE semantics + when writing rows, that is: new rows replace old rows. We need to + inform the storage engine that it should use this behaviour. + */ + + /* Tell the storage engine that we are using REPLACE semantics. */ + thd->lex->duplicates= DUP_REPLACE; + + /* + Pretend we're executing a REPLACE command: this is needed for + InnoDB and NDB Cluster since they are not (properly) checking the + lex->duplicates flag. + */ + thd->lex->sql_command= SQLCOM_REPLACE; + + table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); // needed for ndbcluster + /* + TODO: the cluster team (Tomas?) says that it's better if the engine knows + how many rows are going to be inserted, then it can allocate needed memory + from the start. + */ + table->file->start_bulk_insert(0); + /* + We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill + any TIMESTAMP column with data from the row but instead will use + the event's current time. + As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra + columns, we know that all TIMESTAMP columns on slave will receive explicit + data from the row, so TIMESTAMP_NO_AUTO_SET is ok. + When we allow a table without TIMESTAMP to be replicated to a table having + more columns including a TIMESTAMP column, or when we allow a TIMESTAMP + column to be replicated into a BIGINT column and the slave's table has a + TIMESTAMP column, then the slave's TIMESTAMP column will take its value + from set_time() which we called earlier (consistent with SBR). And then in + some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to + analyze if explicit data is provided for slave's TIMESTAMP columns). + */ + table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; + return error; +} + +int Write_rows_log_event::do_after_row_operations(TABLE *table, int error) +{ + if (error == 0) + error= table->file->end_bulk_insert(); + return error; +} + +char const *Write_rows_log_event::do_prepare_row(THD *thd, TABLE *table, + char const *row_start) +{ + char const *ptr= row_start; + DBUG_ASSERT(table != NULL); + /* + This assertion actually checks that there is at least as many + columns on the slave as on the master. + */ + DBUG_ASSERT(table->s->fields >= m_width); + DBUG_ASSERT(ptr); + ptr= unpack_row(table, table->record[0], ptr, &m_cols); + return ptr; +} + +/* + Check if there are more UNIQUE keys after the given key. +*/ +static int +last_uniq_key(TABLE *table, uint keyno) +{ + while (++keyno < table->s->keys) + if (table->key_info[keyno].flags & HA_NOSAME) + return 0; + return 1; +} + +/* Anonymous namespace for template functions/classes */ +namespace { + + /* + Smart pointer that will automatically call my_afree (a macro) when + the pointer goes out of scope. This is used so that I do not have + to remember to call my_afree() before each return. There is no + overhead associated with this, since all functions are inline. + + I (Matz) would prefer to use the free function as a template + parameter, but that is not possible when the "function" is a + macro. + */ + template <class Obj> + class auto_afree_ptr + { + Obj* m_ptr; + public: + auto_afree_ptr(Obj* ptr) : m_ptr(ptr) { } + ~auto_afree_ptr() { if (m_ptr) my_afree(m_ptr); } + void assign(Obj* ptr) { + /* Only to be called if it hasn't been given a value before. */ + DBUG_ASSERT(m_ptr == NULL); + m_ptr= ptr; + } + Obj* get() { return m_ptr; } + }; + +} + + +/* + Replace the provided record in the database. + + Similar to how it is done in <code>mysql_insert()</code>, we first + try to do a <code>ha_write_row()</code> and of that fails due to + duplicated keys (or indices), we do an <code>ha_update_row()</code> + or a <code>ha_delete_row()</code> instead. + + @param thd Thread context for writing the record. + @param table Table to which record should be written. + + @return Error code on failure, 0 on success. + */ +static int +replace_record(THD *thd, TABLE *table) +{ + DBUG_ASSERT(table != NULL && thd != NULL); + + int error; + int keynum; + auto_afree_ptr<char> key(NULL); + + while ((error= table->file->ha_write_row(table->record[0]))) + { + if ((keynum= table->file->get_dup_key(error)) < 0) + { + /* We failed to retrieve the duplicate key */ + return HA_ERR_FOUND_DUPP_KEY; + } + + /* + We need to retrieve the old row into record[1] to be able to + either update or delete the offending record. We either: + + - use rnd_pos() with a row-id (available as dupp_row) to the + offending row, if that is possible (MyISAM and Blackhole), or else + + - use index_read_idx() with the key that is duplicated, to + retrieve the offending row. + */ + if (table->file->table_flags() & HA_DUPP_POS) + { + error= table->file->rnd_pos(table->record[1], table->file->dupp_ref); + if (error) + return error; + } + else + { + if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) + { + return my_errno; + } + + if (key.get() == NULL) + { + key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length))); + if (key.get() == NULL) + return ENOMEM; + } + + key_copy(key.get(), table->record[0], table->key_info + keynum, 0); + error= table->file->index_read_idx(table->record[1], keynum, key.get(), + table->key_info[keynum].key_length, + HA_READ_KEY_EXACT); + if (error) + return error; + } + + /* + Now, table->record[1] should contain the offending row. That + will enable us to update it or, alternatively, delete it (so + that we can insert the new row afterwards). + + REPLACE is defined as either INSERT or DELETE + INSERT. If + possible, we can replace it with an UPDATE, but that will not + work on InnoDB if FOREIGN KEY checks are necessary. + + I (Matz) am not sure of the reason for the last_uniq_key() + check as, but I'm guessing that it's something along the + following lines. + + Suppose that we got the duplicate key to be a key that is not + the last unique key for the table and we perform an update: + then there might be another key for which the unique check will + fail, so we're better off just deleting the row and inserting + the correct row. + */ + if (last_uniq_key(table, keynum) && + !table->file->referenced_by_foreign_key()) + { + error=table->file->ha_update_row(table->record[1], + table->record[0]); + return error; + } + else + { + if ((error= table->file->ha_delete_row(table->record[1]))) + return error; + /* Will retry ha_write_row() with the offending row removed. */ + } + } + return error; +} + +int Write_rows_log_event::do_exec_row(TABLE *table) +{ + DBUG_ASSERT(table != NULL); + int error= replace_record(thd, table); + return error; +} +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ + +#ifdef MYSQL_CLIENT +void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info) +{ + if (!print_event_info->short_form) + { + print_header(file, print_event_info); + fprintf(file, "\tWrite_rows: table id %lu", m_table_id); + print_base64(file, print_event_info); + } +} +#endif + +/************************************************************************** + Delete_rows_log_event member functions +**************************************************************************/ + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +static int record_compare(TABLE *table, byte const *a, byte const *b) +{ + for (my_size_t i= 0 ; i < table->s->fields ; ++i) + { + uint const off= table->field[i]->offset(); + uint const res= table->field[i]->cmp_binary(a + off, b + off); + if (res != 0) { + return res; + } + } + return 0; +} + + +/* + Find the row given by 'key', if the table has keys, or else use a table scan + to find (and fetch) the row. If the engine allows random access of the + records, a combination of position() and rnd_pos() will be used. + + The 'record_buf' will be used as buffer for records while locating the + correct row. + */ +static int find_and_fetch_row(TABLE *table, byte *key, byte *record_buf) +{ + DBUG_ENTER("find_and_fetch_row(TABLE *table, byte *key, byte *record)"); + DBUG_PRINT("enter", ("table=%p, key=%p, record=%p", + table, key, record_buf)); + + DBUG_ASSERT(table->in_use != NULL); + + if ((table->file->table_flags() & HA_PRIMARY_KEY_ALLOW_RANDOM_ACCESS) + && table->s->primary_key < MAX_KEY) + { + /* + Use a more efficient method to fetch the record given by + table->record[0] if the engine allows it. We first compute a + row reference using the position() member function (it will be + stored in table->file->ref) and the use rnd_pos() to position + the "cursor" at the correct row. + */ + table->file->position(table->record[0]); + DBUG_RETURN(table->file->rnd_pos(table->record[0], table->file->ref)); + } + + DBUG_ASSERT(record_buf); + + if (table->s->keys > 0) + { + int error; + if ((error= table->file->index_read_idx(record_buf, 0, key, + table->key_info->key_length, + HA_READ_KEY_EXACT))) + { + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } + + /* + Below is a minor "optimization". If the key (i.e., key number + 0) has the HA_NOSAME flag set, we know that we have found the + correct record (since there can be no duplicates); otherwise, we + have to compare the record with the one found to see if it is + the correct one. + + CAVEAT! This behaviour is essential for the replication of, + e.g., the mysql.proc table since the correct record *shall* be + found using the primary key *only*. There shall be no + comparison of non-PK columns to decide if the correct record is + found. I can see no scenario where it would be incorrect to + chose the row to change only using a PK or an UNNI. + */ + if (table->key_info->flags & HA_NOSAME) + DBUG_RETURN(0); + + while (record_compare(table, table->record[0], record_buf) != 0) + { + int error; + if ((error= table->file->index_next(record_buf))) + { + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } + } + } + else + { + /* Continue until we find the right record or have made a full loop */ + int restart_count= 0; // Number of times scanning has restarted from top + int error= 0; + do + { + error= table->file->rnd_next(record_buf); + switch (error) + { + case 0: + case HA_ERR_RECORD_DELETED: + break; + + case HA_ERR_END_OF_FILE: + if (++restart_count < 2) + table->file->ha_rnd_init(1); + break; + + default: + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } + } + while (restart_count < 2 && + record_compare(table, table->record[0], record_buf) != 0); + + DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0); + DBUG_RETURN(error); + } + + DBUG_RETURN(0); +} +#endif + +/* + Constructor used to build an event for writing to the binary log. + */ + +#ifndef MYSQL_CLIENT +Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg, + ulong tid, MY_BITMAP const *cols, + bool is_transactional) + : Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional) +#ifdef HAVE_REPLICATION + ,m_memory(NULL), m_key(NULL), m_search_record(NULL) +#endif +{ +} +#endif /* #if !defined(MYSQL_CLIENT) */ + +/* + Constructor used by slave to read the event from the binary log. + */ +#ifdef HAVE_REPLICATION +Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len, + const Format_description_log_event + *description_event) +#if defined(MYSQL_CLIENT) + : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event) +#else + : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event), + m_memory(NULL), m_key(NULL), m_search_record(NULL) +#endif +{ +} +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +int Delete_rows_log_event::do_before_row_operations(TABLE *table) +{ + DBUG_ASSERT(m_memory == NULL); + + if ((table->file->table_flags() & HA_PRIMARY_KEY_ALLOW_RANDOM_ACCESS) && + table->s->primary_key < MAX_KEY) + { + /* + We don't need to allocate any memory for m_search_record and + m_key since they are not used. + */ + return 0; + } + + int error= 0; + + if (table->s->keys > 0) + { + m_memory= + my_multi_malloc(MYF(MY_WME), + &m_search_record, table->s->reclength, + &m_key, table->key_info->key_length, + NULL); + } + else + { + m_memory= m_search_record= my_malloc(table->s->reclength, MYF(MY_WME)); + m_key= NULL; + } + if (!m_memory) + return HA_ERR_OUT_OF_MEM; + + if (table->s->keys > 0) + { + /* We have a key: search the table using the index */ + if (!table->file->inited) + error= table->file->ha_index_init(0, FALSE); + } + else + { + /* We doesn't have a key: search the table using rnd_next() */ + error= table->file->ha_rnd_init(1); + } + + return error; +} + +int Delete_rows_log_event::do_after_row_operations(TABLE *table, int error) +{ + /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/ + table->file->ha_index_or_rnd_end(); + my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc + m_memory= m_search_record= m_key= NULL; + + return error; +} + +char const *Delete_rows_log_event::do_prepare_row(THD *thd, TABLE *table, + char const *row_start) +{ + char const *ptr= row_start; + DBUG_ASSERT(ptr); + /* + This assertion actually checks that there is at least as many + columns on the slave as on the master. + */ + DBUG_ASSERT(table->s->fields >= m_width); + + DBUG_ASSERT(ptr != NULL); + ptr= unpack_row(table, table->record[0], ptr, &m_cols); + + /* + If we will access rows using the random access method, m_key will + be set to NULL, so we do not need to make a key copy in that case. + */ + if (m_key) + { + KEY *const key_info= table->key_info; + + key_copy(m_key, table->record[0], key_info, 0); + } + + return ptr; +} + +int Delete_rows_log_event::do_exec_row(TABLE *table) +{ + DBUG_ASSERT(table != NULL); + + int error= find_and_fetch_row(table, m_key, m_search_record); + if (error) + return error; + + /* + Now we should have the right row to delete. We are using + record[0] since it is guaranteed to point to a record with the + correct value. + */ + error= table->file->ha_delete_row(table->record[0]); + + return error; +} + +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ + +#ifdef MYSQL_CLIENT +void Delete_rows_log_event::print(FILE *file, + PRINT_EVENT_INFO* print_event_info) +{ + if (!print_event_info->short_form) + { + print_header(file, print_event_info); + fprintf(file, "\tDelete_rows: table id %lu", m_table_id); + print_base64(file, print_event_info); + } +} +#endif + + +/************************************************************************** + Update_rows_log_event member functions +**************************************************************************/ + +/* + Constructor used to build an event for writing to the binary log. + */ +#if !defined(MYSQL_CLIENT) +Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg, + ulong tid, MY_BITMAP const *cols, + bool is_transactional) +: Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional) +#ifdef HAVE_REPLICATION + , m_memory(NULL), m_key(NULL) +#endif +{ +} +#endif /* !defined(MYSQL_CLIENT) */ + +/* + Constructor used by slave to read the event from the binary log. + */ +#ifdef HAVE_REPLICATION +Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len, + const + Format_description_log_event + *description_event) +#if defined(MYSQL_CLIENT) + : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event) +#else + : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event), + m_memory(NULL), m_key(NULL) +#endif +{ +} +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +int Update_rows_log_event::do_before_row_operations(TABLE *table) +{ + DBUG_ASSERT(m_memory == NULL); + + if ((table->file->table_flags() & HA_PRIMARY_KEY_ALLOW_RANDOM_ACCESS) && + table->s->primary_key < MAX_KEY) + { + /* + We don't need to allocate any memory for m_search_record and + m_key since they are not used. + */ + return 0; + } + + int error= 0; + + if (table->s->keys > 0) + { + m_memory= + my_multi_malloc(MYF(MY_WME), + &m_search_record, table->s->reclength, + &m_key, table->key_info->key_length, + NULL); + } + else + { + m_memory= m_search_record= my_malloc(table->s->reclength, MYF(MY_WME)); + m_key= NULL; + } + if (!m_memory) + return HA_ERR_OUT_OF_MEM; + + if (table->s->keys > 0) + { + /* We have a key: search the table using the index */ + if (!table->file->inited) + error= table->file->ha_index_init(0, FALSE); + } + else + { + /* We doesn't have a key: search the table using rnd_next() */ + error= table->file->ha_rnd_init(1); + } + table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; + + return error; +} + +int Update_rows_log_event::do_after_row_operations(TABLE *table, int error) +{ + /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/ + table->file->ha_index_or_rnd_end(); + my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); + m_memory= m_search_record= m_key= NULL; + + return error; +} + +char const *Update_rows_log_event::do_prepare_row(THD *thd, TABLE *table, + char const *row_start) +{ + char const *ptr= row_start; + DBUG_ASSERT(ptr); + /* + This assertion actually checks that there is at least as many + columns on the slave as on the master. + */ + DBUG_ASSERT(table->s->fields >= m_width); + + /* record[0] is the before image for the update */ + ptr= unpack_row(table, table->record[0], ptr, &m_cols); + DBUG_ASSERT(ptr != NULL); + /* record[1] is the after image for the update */ + ptr= unpack_row(table, table->record[1], ptr, &m_cols); + + /* + If we will access rows using the random access method, m_key will + be set to NULL, so we do not need to make a key copy in that case. + */ + if (m_key) + { + KEY *const key_info= table->key_info; + + key_copy(m_key, table->record[0], key_info, 0); + } + + return ptr; +} + +int Update_rows_log_event::do_exec_row(TABLE *table) +{ + DBUG_ASSERT(table != NULL); + + int error= find_and_fetch_row(table, m_key, m_search_record); + if (error) + return error; + + /* + Now we should have the right row to update. The record that has + been fetched is guaranteed to be in record[0], so we use that. + */ + error= table->file->ha_update_row(table->record[0], table->record[1]); + + return error; +} +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ + +#ifdef MYSQL_CLIENT +void Update_rows_log_event::print(FILE *file, + PRINT_EVENT_INFO* print_event_info) +{ + if (!print_event_info->short_form) + { + print_header(file, print_event_info); + fprintf(file, "\tUpdate_rows: table id %lu", m_table_id); + print_base64(file, print_event_info); + } +} +#endif + +#endif /* defined(HAVE_ROW_BASED_REPLICATION) */ |