summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc323
1 files changed, 203 insertions, 120 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 8a39b1fc4eb..3164a62d876 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -24,6 +24,7 @@
#include "mysql_priv.h"
#include "slave.h"
#include "rpl_filter.h"
+#include "rpl_utility.h"
#include <my_dir.h>
#endif /* MYSQL_CLIENT */
#include <base64.h>
@@ -5258,38 +5259,86 @@ int Rows_log_event::do_add_row_data(byte *const row_data,
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
/*
- Unpack a row into a record. The row is assumed to only consist of the fields
- for which the bitset represented by 'arr' and 'bits'; the other parts of the
+ Unpack a row into a record.
+
+ The row is assumed to only consist of the fields for which the
+ bitset represented by 'arr' and 'bits'; the other parts of the
record are left alone.
+
+ At most 'colcnt' columns are read: if the table is larger than that,
+ the remaining fields are not filled in.
*/
-static char const *unpack_row(TABLE *table,
- byte *record, char const *row,
- MY_BITMAP const *cols)
+static int
+unpack_row(RELAY_LOG_INFO *rli,
+ TABLE *table, uint colcnt, byte *record,
+ char const *row, MY_BITMAP const *cols,
+ char const **row_end, ulong *master_reclength)
{
DBUG_ASSERT(record && row);
-
MY_BITMAP *write_set= table->file->write_set;
+
my_size_t const n_null_bytes= table->s->null_bytes;
my_ptrdiff_t const offset= record - (byte*) table->record[0];
-
- memcpy(record, row, n_null_bytes);
- char const *ptr= row + n_null_bytes;
+ memcpy(record, row, n_null_bytes); // [1]
+ int error= 0;
bitmap_set_all(write_set);
+
Field **const begin_ptr = table->field;
- for (Field **field_ptr= begin_ptr ; *field_ptr ; ++field_ptr)
+ Field **field_ptr;
{
- Field *const f= *field_ptr;
+ char const *ptr= row + n_null_bytes;
+ for (field_ptr= begin_ptr ; *field_ptr ; ++field_ptr)
+ {
+ Field *const f= *field_ptr;
+
+ if (colcnt == 0)
+ break;
- if (bitmap_is_set(cols, field_ptr - begin_ptr))
+ if (bitmap_is_set(cols, field_ptr - begin_ptr))
+ {
+ /* Field...::unpack() cannot return 0 */
+ ptr= f->unpack(f->ptr + offset, ptr);
+ --colcnt;
+ }
+ else
+ bitmap_clear_bit(write_set, (field_ptr - begin_ptr) + 1);
+ }
+
+ *row_end = ptr;
+ if (master_reclength)
+ {
+ if (*field_ptr)
+ *master_reclength = (*field_ptr)->ptr - table->record[0];
+ else
+ *master_reclength = table->s->reclength;
+ }
+ }
+
+ /*
+ Set properties for remaining columns, if there are any. We let the
+ corresponding bit in the write_set be set, to write the value if
+ it was not there already. We iterate over all remaining columns,
+ even if there were an error, to get as many error messages as
+ possible. We are still able to return a pointer to the next row,
+ so wedo that.
+ */
+ for ( ; *field_ptr ; ++field_ptr)
+ {
+ if ((*field_ptr)->flags & (NOT_NULL_FLAG | NO_DEFAULT_VALUE_FLAG))
{
- /* Field...::unpack() cannot return 0 */
- ptr= f->unpack(f->ptr + offset, ptr);
+ slave_print_msg(ERROR_LEVEL, rli, ER_NO_DEFAULT_FOR_FIELD,
+ "Field `%s` of table `%s`.`%s` "
+ "has no default value and cannot be NULL",
+ (*field_ptr)->field_name, table->s->db.str,
+ table->s->table_name.str);
+ error = ER_NO_DEFAULT_FOR_FIELD;
}
else
- bitmap_clear_bit(write_set, (field_ptr - begin_ptr) + 1);
+ (*field_ptr)->set_default();
}
- return ptr;
+
+ return error;
}
int Rows_log_event::exec_event(st_relay_log_info *rli)
@@ -5444,7 +5493,11 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
error= do_before_row_operations(table);
while (error == 0 && row_start < (const char*)m_rows_end) {
- char const *row_end= do_prepare_row(thd, table, row_start);
+ char const *row_end= NULL;
+ if ((error= do_prepare_row(thd, rli, table, row_start, &row_end)))
+ break; // We should to the after-row operation even in the
+ // case of error
+
DBUG_ASSERT(row_end != NULL); // cannot happen
DBUG_ASSERT(row_end <= (const char*)m_rows_end);
@@ -5646,7 +5699,7 @@ void Rows_log_event::pack_info(Protocol *protocol)
#endif
/**************************************************************************
- Table_map_log_event member functions
+ Table_map_log_event member functions and support functions
**************************************************************************/
/*
@@ -5910,71 +5963,9 @@ int Table_map_log_event::exec_event(st_relay_log_info *rli)
*/
DBUG_ASSERT(m_table->in_use);
- /*
- Check that the number of columns and the field types in the
- event match the number of columns and field types in the opened
- table.
- */
- uint col= m_table->s->fields;
-
- if (col == m_colcnt)
+ table_def const def(m_coltype, m_colcnt);
+ if (def.compatible_with(rli, m_table))
{
- while (col-- > 0)
- if (m_table->field[col]->type() != m_coltype[col])
- break;
- }
-
- TABLE_SHARE const *const tsh= m_table->s;
-
- /*
- Check the following termination conditions:
-
- (col == m_table->s->fields)
- ==> (m_table->s->fields != m_colcnt)
- (0 <= col < m_table->s->fields)
- ==> (m_table->field[col]->type() != m_coltype[col])
-
- Logically, A ==> B is equivalent to !A || B
-
- Since col is unsigned, is suffices to check that col <=
- tsh->fields. If col wrapped (by decreasing col when it is 0),
- the number will be UINT_MAX, which is greater than tsh->fields.
- */
- DBUG_ASSERT(!(col == tsh->fields) || tsh->fields != m_colcnt);
- DBUG_ASSERT(!(col < tsh->fields) ||
- (m_table->field[col]->type() != m_coltype[col]));
-
- if (col <= tsh->fields)
- {
- /*
- If we get here, the number of columns in the event didn't
- match the number of columns in the table on the slave, *or*
- there were a column in the table on the slave that did not
- have the same type as given in the event.
-
- If 'col' has the value that was assigned to it, it was a
- mismatch between the number of columns on the master and the
- slave.
- */
- if (col == tsh->fields)
- {
- DBUG_ASSERT(tsh->db.str && tsh->table_name.str);
- slave_print_msg(ERROR_LEVEL, rli, ER_BINLOG_ROW_WRONG_TABLE_DEF,
- "Table width mismatch - "
- "received %u columns, %s.%s has %u columns",
- m_colcnt, tsh->db.str, tsh->table_name.str, tsh->fields);
- }
- else
- {
- DBUG_ASSERT(col < m_colcnt && col < tsh->fields);
- DBUG_ASSERT(tsh->db.str && tsh->table_name.str);
- slave_print_msg(ERROR_LEVEL, rli, ER_BINLOG_ROW_WRONG_TABLE_DEF,
- "Column %d type mismatch - "
- "received type %d, %s.%s has type %d",
- col, m_coltype[col], tsh->db.str, tsh->table_name.str,
- m_table->field[col]->type());
- }
-
thd->query_error= 1;
DBUG_RETURN(ERR_BAD_TABLE_DEF);
}
@@ -6085,6 +6076,25 @@ void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
}
#endif
+#ifndef DBUG_OFF
+static void
+print_column_values(char const *text, THD *thd, TABLE *table)
+{
+ THD *old_thd= table->in_use;
+ if (table->in_use == NULL)
+ table->in_use= thd;
+ for (Field **fptr= table->field ; *fptr ; ++fptr)
+ {
+ char buf[MAX_FIELD_WIDTH];
+ String str(buf, sizeof(buf), system_charset_info);
+ (*fptr)->val_str(&str);
+ DBUG_PRINT("info", ("%s for column %d is '%s'",
+ text, fptr - table->field, str.c_ptr()));
+ }
+ table->in_use= old_thd;
+}
+#endif
+
/**************************************************************************
Write_rows_log_event member functions
**************************************************************************/
@@ -6169,19 +6179,22 @@ int Write_rows_log_event::do_after_row_operations(TABLE *table, int error)
return error;
}
-char const *Write_rows_log_event::do_prepare_row(THD *thd, TABLE *table,
- char const *row_start)
+int Write_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO *rli,
+ TABLE *table,
+ char const *row_start,
+ char const **row_end)
{
- char const *ptr= row_start;
DBUG_ASSERT(table != NULL);
- /*
- This assertion actually checks that there is at least as many
- columns on the slave as on the master.
- */
- DBUG_ASSERT(table->s->fields >= m_width);
- DBUG_ASSERT(ptr);
- ptr= unpack_row(table, (byte*)table->record[0], ptr, &m_cols);
- return ptr;
+ DBUG_ASSERT(row_start && row_end);
+
+ int error;
+ error= unpack_row(rli,
+ table, m_width, (byte*)table->record[0],
+ row_start, &m_cols, row_end, &m_master_reclength);
+#ifndef DBUG_OFF
+ print_column_values("Unpacked value", thd, table);
+#endif
+ return error;
}
/*
@@ -6237,24 +6250,33 @@ namespace {
@param thd Thread context for writing the record.
@param table Table to which record should be written.
-
+ @param master_reclength
+ Offset to first column that is not present on the master,
+ alternatively the length of the record on the master side.
@return Error code on failure, 0 on success.
*/
static int
-replace_record(THD *thd, TABLE *table)
+replace_record(THD *thd, TABLE *table,
+ ulong const master_reclength,
+ uint const master_fields)
{
+ DBUG_ENTER("replace_record");
DBUG_ASSERT(table != NULL && thd != NULL);
int error;
int keynum;
auto_afree_ptr<char> key(NULL);
+#ifndef DBUG_OFF
+ print_column_values("Starting write value", thd, table);
+#endif
+
while ((error= table->file->ha_write_row(table->record[0])))
{
if ((keynum= table->file->get_dup_key(error)) < 0)
{
/* We failed to retrieve the duplicate key */
- return HA_ERR_FOUND_DUPP_KEY;
+ DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
}
/*
@@ -6271,20 +6293,20 @@ replace_record(THD *thd, TABLE *table)
{
error= table->file->rnd_pos(table->record[1], table->file->dupp_ref);
if (error)
- return error;
+ DBUG_RETURN(error);
}
else
{
if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
{
- return my_errno;
+ DBUG_RETURN(my_errno);
}
if (key.get() == NULL)
{
key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
if (key.get() == NULL)
- return ENOMEM;
+ DBUG_RETURN(ENOMEM);
}
key_copy((byte*)key.get(), table->record[0], table->key_info + keynum, 0);
@@ -6293,7 +6315,7 @@ replace_record(THD *thd, TABLE *table)
table->key_info[keynum].key_length,
HA_READ_KEY_EXACT);
if (error)
- return error;
+ DBUG_RETURN(error);
}
/*
@@ -6301,6 +6323,59 @@ replace_record(THD *thd, TABLE *table)
will enable us to update it or, alternatively, delete it (so
that we can insert the new row afterwards).
+ First we copy the columns into table->record[0] that are not
+ present on the master from table->record[1], if there are any.
+ */
+
+ DBUG_PRINT("info", ("Copying to %p from offset %u to %u",
+ table->record[0],
+ master_reclength, table->s->reclength));
+#ifndef DBUG_OFF
+ print_column_values("After copy value", thd, table);
+#endif
+ if (master_reclength < table->s->reclength)
+ bmove_align(table->record[0] + master_reclength,
+ table->record[1] + master_reclength,
+ table->s->reclength - master_reclength);
+
+ /*
+ Bit columns are special. We iterate over all the remaining
+ columns and copy the "extra" bits to the new record. This is
+ not a very good solution: it should be refactored on
+ opportunity.
+
+ REFACTORING SUGGESTION (Matz). Introduce a member function
+ similar to move_field_offset() called copy_field_offset() to
+ copy field values and implement it for all Field subclasses. Use
+ this function to copy data from the found record to the record
+ that are going to be inserted.
+
+ The copy_field_offset() function need to be a virtual function,
+ which in this case will prevent copying an entire range of
+ fields efficiently.
+ */
+ {
+ Field **field_ptr= table->field + master_fields;
+ for ( ; *field_ptr ; ++field_ptr)
+ {
+ switch ((*field_ptr)->real_type())
+ {
+ default:
+ /* Nothing to do */
+ break;
+
+ case FIELD_TYPE_BIT:
+ Field_bit *f= static_cast<Field_bit*>(*field_ptr);
+ my_ptrdiff_t const offset= table->record[1] - table->record[0];
+ uchar const bits=
+ get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len);
+ set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len);
+ break;
+ }
+ }
+ }
+
+ /*
REPLACE is defined as either INSERT or DELETE + INSERT. If
possible, we can replace it with an UPDATE, but that will not
work on InnoDB if FOREIGN KEY checks are necessary.
@@ -6320,22 +6395,22 @@ replace_record(THD *thd, TABLE *table)
{
error=table->file->ha_update_row(table->record[1],
table->record[0]);
- return error;
+ DBUG_RETURN(error);
}
else
{
if ((error= table->file->ha_delete_row(table->record[1])))
- return error;
+ DBUG_RETURN(error);
/* Will retry ha_write_row() with the offending row removed. */
}
}
- return error;
+ DBUG_RETURN(error);
}
int Write_rows_log_event::do_exec_row(TABLE *table)
{
DBUG_ASSERT(table != NULL);
- int error= replace_record(thd, table);
+ int error= replace_record(thd, table, m_master_reclength, m_width);
return error;
}
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
@@ -6606,20 +6681,22 @@ int Delete_rows_log_event::do_after_row_operations(TABLE *table, int error)
return error;
}
-char const *Delete_rows_log_event::do_prepare_row(THD *thd, TABLE *table,
- char const *row_start)
+int Delete_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO *rli,
+ TABLE *table,
+ char const *row_start,
+ char const **row_end)
{
- char const *ptr= row_start;
- DBUG_ASSERT(ptr);
+ int error;
+ DBUG_ASSERT(row_start && row_end);
/*
This assertion actually checks that there is at least as many
columns on the slave as on the master.
*/
DBUG_ASSERT(table->s->fields >= m_width);
- DBUG_ASSERT(ptr != NULL);
- ptr= unpack_row(table, table->record[0], ptr, &m_cols);
-
+ error= unpack_row(rli,
+ table, m_width, table->record[0],
+ row_start, &m_cols, row_end, &m_master_reclength);
/*
If we will access rows using the random access method, m_key will
be set to NULL, so we do not need to make a key copy in that case.
@@ -6631,7 +6708,7 @@ char const *Delete_rows_log_event::do_prepare_row(THD *thd, TABLE *table,
key_copy(m_key, table->record[0], key_info, 0);
}
- return ptr;
+ return error;
}
int Delete_rows_log_event::do_exec_row(TABLE *table)
@@ -6757,11 +6834,13 @@ int Update_rows_log_event::do_after_row_operations(TABLE *table, int error)
return error;
}
-char const *Update_rows_log_event::do_prepare_row(THD *thd, TABLE *table,
- char const *row_start)
+int Update_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO *rli,
+ TABLE *table,
+ char const *row_start,
+ char const **row_end)
{
- char const *ptr= row_start;
- DBUG_ASSERT(ptr);
+ int error;
+ DBUG_ASSERT(row_start && row_end);
/*
This assertion actually checks that there is at least as many
columns on the slave as on the master.
@@ -6769,10 +6848,14 @@ char const *Update_rows_log_event::do_prepare_row(THD *thd, TABLE *table,
DBUG_ASSERT(table->s->fields >= m_width);
/* record[0] is the before image for the update */
- ptr= unpack_row(table, table->record[0], ptr, &m_cols);
- DBUG_ASSERT(ptr != NULL);
+ error= unpack_row(rli,
+ table, m_width, table->record[0],
+ row_start, &m_cols, row_end, &m_master_reclength);
+ row_start = *row_end;
/* m_after_image is the after image for the update */
- ptr= unpack_row(table, m_after_image, ptr, &m_cols);
+ error= unpack_row(rli,
+ table, m_width, m_after_image,
+ row_start, &m_cols, row_end, &m_master_reclength);
/*
If we will access rows using the random access method, m_key will
@@ -6785,7 +6868,7 @@ char const *Update_rows_log_event::do_prepare_row(THD *thd, TABLE *table,
key_copy(m_key, table->record[0], key_info, 0);
}
- return ptr;
+ return error;
}
int Update_rows_log_event::do_exec_row(TABLE *table)