diff options
author | rafal@quant.(none) <> | 2007-08-26 14:31:10 +0200 |
---|---|---|
committer | rafal@quant.(none) <> | 2007-08-26 14:31:10 +0200 |
commit | f8b64e17f94923ef421469d648c31c0f06d2cf96 (patch) | |
tree | 81aff1f6b7eb590b3dbf821c2bf569fef289fbf5 /sql/log_event.cc | |
parent | c7cf3e05b5464c5cac9f1e8dee7d33f5390e10fb (diff) | |
download | mariadb-git-f8b64e17f94923ef421469d648c31c0f06d2cf96.tar.gz |
BUG#21842 (Cluster fails to replicate to innodb or myisam with err 134
using TPC-B):
Problem: A RBR event can contain incomplete row data (only key value and
fields which have been changed). In that case, when the row is unpacked
into record and written to a table, the missing fields get incorrect NULL
values leading to master-slave inconsistency.
Solution: Use values found in slave's table for columns which are not given
in the rows event. The code for writing a single row uses the following
algorithm:
1. unpack row_data into table->record[0],
2. try to insert record,
3. if duplicate record found, fetch it into table->record[0],
4. unpack row_data into table->record[0],
5. write table->record[0] into the table.
Where row_data is the row as stored in the data area of a rows event.
Thus:
a) unpacking of row_data happens at the time when row is written into
a table,
b) when unpacking (in step 4), only columns present in row_data are
overwritten - all other columns remain as they were found in the table.
Since all data needed for the above algorithm is stored inside
Rows_log_event class, functions which locate and write rows are turned
into methods of that class.
replace_record() -> Rows_log_event::write_row()
find_and_fetch_row() -> Rows_log_event::find_row()
Both methods take row data from event's data buffer - the row being
processed is pointed by m_curr_row. They unpack the data as needed into
table's record buffers record[0] or record[1]. When row is unpacked,
m_curr_row_end is set to point at next row in the data buffer.
Other changes introduced in this changeset:
- Change signature of unpack_row(): don't report errors and don't
setup table's rw_set here. Errors can happen only when setting default
values in prepare_record() function and are detected there.
- In Rows_log_event and derived classes, don't pass arguments to
the execution primitives (do_...() member functions) but use class
members instead.
- Move old row handling code into log_event_old.cc to be used by
*_rows_log_event_old classes.
Also, a new test rpl_ndb_2other is added which tests basic replication
from master using ndb tables to slave storing the same tables using
(possibly) different engine (myisam,innodb).
Test is based on existing tests rpl_ndb_2myisam and rpl_ndb_2innodb.
However, these tests doesn't work for various reasons and currently are
disabled (see BUG#19227).
The new test differs from the ones it is based on as follows:
1. Single test tests replication with different storage engines on slave
(myisam, innodb, ndb).
2. Include file extra/rpl_tests/rpl_ndb_2multi_eng.test containing
original tests is replaced by extra/rpl_tests/rpl_ndb_2multi_basic.test
which doesn't contain tests using partitioned tables as these don't work
currently. Instead, it tests replication to a slave which has more or
less columns than master.
3. Include file include/rpl_multi_engine3.inc is replaced with
include/rpl_multi_engine2.inc. The later differs by performing slightly
different operations (updating more than one row in the table) and
clearing table with "TRUNCATE TABLE" statement instead of "DELETE FROM"
as replication of "DELETE" doesn't work well in this setting.
4. Slave must use option --log-slave-updates=0 as otherwise execution of
replication events generated by ndb fails if table uses a different
storage engine on slave (see BUG#29569).
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 835 |
1 files changed, 364 insertions, 471 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index f71d1bb3622..9f3b4c4e487 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -5654,13 +5654,14 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, m_table_id(tid), m_width(tbl_arg ? tbl_arg->s->fields : 1), m_rows_buf(0), m_rows_cur(0), m_rows_end(0), - m_flags(0) + m_curr_row(NULL), m_curr_row_end(NULL), + m_flags(0), m_key(NULL) { /* We allow a special form of dummy event when the table, and cols are null and the table id is ~0UL. This is a temporary solution, to be able to terminate a started statement in the - binary log: the extreneous events will be removed in the future. + binary log: the extraneous events will be removed in the future. */ DBUG_ASSERT(tbl_arg && tbl_arg->s && tid != ~0UL || !tbl_arg && !cols && tid == ~0UL); @@ -5669,7 +5670,7 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, set_flags(NO_FOREIGN_KEY_CHECKS_F); if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS) set_flags(RELAXED_UNIQUE_CHECKS_F); - /* if bitmap_init fails, catched in is_valid() */ + /* if bitmap_init fails, caught in is_valid() */ if (likely(!bitmap_init(&m_cols, m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL, m_width, @@ -5696,7 +5697,10 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, *description_event) : Log_event(buf, description_event), m_row_count(0), - m_rows_buf(0), m_rows_cur(0), m_rows_end(0) + m_table(NULL), + m_rows_buf(0), m_rows_cur(0), m_rows_end(0), + m_curr_row(NULL), m_curr_row_end(NULL), + m_key(NULL) { DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)"); uint8 const common_header_len= description_event->common_header_len; @@ -5755,7 +5759,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, { DBUG_PRINT("debug", ("Reading from %p", ptr_after_width)); - /* if bitmap_init fails, catched in is_valid() */ + /* if bitmap_init fails, caught in is_valid() */ if (likely(!bitmap_init(&m_cols_ai, m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL, m_width, @@ -5785,6 +5789,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME)); if (likely((bool)m_rows_buf)) { + m_curr_row= m_rows_buf; m_rows_end= m_rows_buf + data_size; m_rows_cur= m_rows_end; memcpy(m_rows_buf, ptr_rows_data, data_size); @@ -5889,7 +5894,6 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli) { DBUG_ENTER("Rows_log_event::do_apply_event(st_relay_log_info*)"); int error= 0; - uchar const *row_start= m_rows_buf; /* If m_table_id == ~0UL, then we have a dummy event that does not @@ -6049,7 +6053,9 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli) #endif } - TABLE* table= const_cast<RELAY_LOG_INFO*>(rli)->m_table_map.get_table(m_table_id); + TABLE* + table= + m_table= const_cast<RELAY_LOG_INFO*>(rli)->m_table_map.get_table(m_table_id); if (table) { @@ -6096,22 +6102,41 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli) */ const_cast<RELAY_LOG_INFO*>(rli)->set_flag(RELAY_LOG_INFO::IN_STMT); - error= do_before_row_operations(table); - while (error == 0 && row_start < m_rows_end) - { - uchar const *row_end= NULL; - if ((error= do_prepare_row(thd, rli, table, row_start, &row_end))) - break; // We should perform the after-row operation even in - // the case of error + if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols)) + set_flags(COMPLETE_ROWS_F); - DBUG_ASSERT(row_end != NULL); // cannot happen - DBUG_ASSERT(row_end <= m_rows_end); + /* + Set tables write and read sets. + + Read_set contains all slave columns (in case we are going to fetch + a complete record from slave) + + Write_set equals the m_cols bitmap sent from master but it can be + longer if slave has extra columns. + */ + + DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols); + + bitmap_set_all(table->read_set); + bitmap_set_all(table->write_set); + if (!get_flags(COMPLETE_ROWS_F)) + bitmap_intersect(table->write_set,&m_cols); + // Do event specific preparations + + error= do_before_row_operations(rli); + + // row processing loop + + while (error == 0 && m_curr_row < m_rows_end) + { /* in_use can have been set to NULL in close_tables_for_reopen */ THD* old_thd= table->in_use; if (!table->in_use) table->in_use= thd; - error= do_exec_row(table); + + error= do_exec_row(rli); + table->in_use = old_thd; switch (error) { @@ -6132,21 +6157,38 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli) break; } - row_start= row_end; - } + /* + If m_curr_row_end was not set during event execution (e.g., because + of errors) we can't proceed to the next row. If the error is transient + (i.e., error==0 at this point) we must call unpack_current_row() to set + m_curr_row_end. + */ + + if (!m_curr_row_end && !error) + unpack_current_row(rli); + + // at this moment m_curr_row_end should be set + DBUG_ASSERT(error || m_curr_row_end != NULL); + DBUG_ASSERT(error || m_curr_row < m_curr_row_end); + DBUG_ASSERT(error || m_curr_row_end <= m_rows_end); + + m_curr_row= m_curr_row_end; + + } // row processing loop + DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event", const_cast<RELAY_LOG_INFO*>(rli)->abort_slave= 1;); - error= do_after_row_operations(table, error); + error= do_after_row_operations(rli, error); if (!cache_stmt) { DBUG_PRINT("info", ("Marked that we need to keep log")); thd->options|= OPTION_KEEP_LOG; } - } + } // if (table) /* - We need to delay this clear until the table def is no longer needed. - The table def is needed in unpack_row(). + We need to delay this clear until the table def stored in m_table_def is no + longer needed. It is used in unpack_current_row(). */ if (rli->tables_to_lock && get_flags(STMT_END_F)) const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock(); @@ -6199,7 +6241,7 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli) wait (reached end of last relay log and nothing gets appended there), we timeout after one minute, and notify DBA about the problem. When WL#2975 is implemented, just remove the member - st_relay_log_info::last_event_start_time and all its occurences. + st_relay_log_info::last_event_start_time and all its occurrences. */ const_cast<RELAY_LOG_INFO*>(rli)->last_event_start_time= my_time(0); } @@ -6603,7 +6645,7 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, else m_data_size+= m_field_metadata_size + 1; - /* If malloc fails, catched in is_valid() */ + /* If malloc fails, caught in is_valid() */ if ((m_memory= (uchar*) my_malloc(m_colcnt, MYF(MY_WME)))) { m_coltype= reinterpret_cast<uchar*>(m_memory); @@ -6708,7 +6750,7 @@ Table_map_log_event::Table_map_log_event(const char *buf, uint event_len, (ulong) m_tbllen, (long) (ptr_tbllen-(const uchar*)vpart), m_colcnt, (long) (ptr_colcnt-(const uchar*)vpart))); - /* Allocate mem for all fields in one go. If fails, catched in is_valid() */ + /* Allocate mem for all fields in one go. If fails, caught in is_valid() */ m_memory= (uchar*) my_multi_malloc(MYF(MY_WME), &m_dbnam, (uint) m_dblen + 1, &m_tblnam, (uint) m_tbllen + 1, @@ -7034,7 +7076,8 @@ Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len, #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -int Write_rows_log_event::do_before_row_operations(TABLE *table) +int +Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const) { int error= 0; @@ -7056,26 +7099,26 @@ int Write_rows_log_event::do_before_row_operations(TABLE *table) /* Do not raise the error flag in case of hitting to an unique attribute */ - table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); + m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); /* NDB specific: update from ndb master wrapped as Write_rows */ /* so that the event should be applied to replace slave's row */ - table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); + m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); /* NDB specific: if update from ndb master wrapped as Write_rows does not find the row it's assumed idempotent binlog applying is taking place; don't raise the error. */ - table->file->extra(HA_EXTRA_IGNORE_NO_KEY); + m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY); /* TODO: the cluster team (Tomas?) says that it's better if the engine knows how many rows are going to be inserted, then it can allocate needed memory from the start. */ - table->file->ha_start_bulk_insert(0); + m_table->file->ha_start_bulk_insert(0); /* We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill any TIMESTAMP column with data from the row but instead will use @@ -7091,47 +7134,31 @@ int Write_rows_log_event::do_before_row_operations(TABLE *table) some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to analyze if explicit data is provided for slave's TIMESTAMP columns). */ - table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; + m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; return error; } -int Write_rows_log_event::do_after_row_operations(TABLE *table, int error) +int +Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const, + int error) { int local_error= 0; - table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); - table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); + m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); + m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); /* reseting the extra with table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY); fires bug#27077 todo: explain or fix */ - if ((local_error= table->file->ha_end_bulk_insert())) + if ((local_error= m_table->file->ha_end_bulk_insert())) { - table->file->print_error(local_error, MYF(0)); + m_table->file->print_error(local_error, MYF(0)); } return error? error : local_error; } - -int Write_rows_log_event::do_prepare_row(THD *thd_arg, - RELAY_LOG_INFO const *rli, - TABLE *table, - uchar const *const row_start, - uchar const **const row_end) -{ - DBUG_ASSERT(table != NULL); - DBUG_ASSERT(row_start && row_end); - - if (int error= unpack_row(rli, table, m_width, row_start, &m_cols, row_end, - &m_master_reclength, table->write_set, WRITE_ROWS_EVENT)) - { - thd_arg->net.last_errno= error; - return error; - } - bitmap_copy(table->read_set, table->write_set); - return 0; -} +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) /* Check if there are more UNIQUE keys after the given key. @@ -7145,135 +7172,6 @@ last_uniq_key(TABLE *table, uint keyno) return 1; } -/* Anonymous namespace for template functions/classes */ -namespace { - - /* - Smart pointer that will automatically call my_afree (a macro) when - the pointer goes out of scope. This is used so that I do not have - to remember to call my_afree() before each return. There is no - overhead associated with this, since all functions are inline. - - I (Matz) would prefer to use the free function as a template - parameter, but that is not possible when the "function" is a - macro. - */ - template <class Obj> - class auto_afree_ptr - { - Obj* m_ptr; - public: - auto_afree_ptr(Obj* ptr) : m_ptr(ptr) { } - ~auto_afree_ptr() { if (m_ptr) my_afree(m_ptr); } - void assign(Obj* ptr) { - /* Only to be called if it hasn't been given a value before. */ - DBUG_ASSERT(m_ptr == NULL); - m_ptr= ptr; - } - Obj* get() { return m_ptr; } - }; - -} - - -/* - Copy "extra" columns from record[1] to record[0]. - - Copy the extra fields that are not present on the master but are - present on the slave from record[1] to record[0]. This is used - after fetching a record that are to be updated, either inside - replace_record() or as part of executing an update_row(). - */ -static int -copy_extra_record_fields(TABLE *table, - size_t master_reclength, - my_ptrdiff_t master_fields) -{ - DBUG_ENTER("copy_extra_record_fields(table, master_reclen, master_fields)"); - DBUG_PRINT("info", ("Copying to 0x%lx " - "from field %lu at offset %lu " - "to field %d at offset %lu", - (long) table->record[0], - (ulong) master_fields, (ulong) master_reclength, - table->s->fields, table->s->reclength)); - /* - Copying the extra fields of the slave that does not exist on - master into record[0] (which are basically the default values). - */ - - if (table->s->fields < (uint) master_fields) - DBUG_RETURN(0); - - DBUG_ASSERT(master_reclength <= table->s->reclength); - if (master_reclength < table->s->reclength) - bmove_align(table->record[0] + master_reclength, - table->record[1] + master_reclength, - table->s->reclength - master_reclength); - - /* - Bit columns are special. We iterate over all the remaining - columns and copy the "extra" bits to the new record. This is - not a very good solution: it should be refactored on - opportunity. - - REFACTORING SUGGESTION (Matz). Introduce a member function - similar to move_field_offset() called copy_field_offset() to - copy field values and implement it for all Field subclasses. Use - this function to copy data from the found record to the record - that are going to be inserted. - - The copy_field_offset() function need to be a virtual function, - which in this case will prevent copying an entire range of - fields efficiently. - */ - { - Field **field_ptr= table->field + master_fields; - for ( ; *field_ptr ; ++field_ptr) - { - /* - Set the null bit according to the values in record[1] - */ - if ((*field_ptr)->maybe_null() && - (*field_ptr)->is_null_in_record(reinterpret_cast<uchar*>(table->record[1]))) - (*field_ptr)->set_null(); - else - (*field_ptr)->set_notnull(); - - /* - Do the extra work for special columns. - */ - switch ((*field_ptr)->real_type()) - { - default: - /* Nothing to do */ - break; - - case MYSQL_TYPE_BIT: - Field_bit *f= static_cast<Field_bit*>(*field_ptr); - if (f->bit_len > 0) - { - my_ptrdiff_t const offset= table->record[1] - table->record[0]; - uchar const bits= - get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len); - set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len); - } - break; - } - } - } - DBUG_RETURN(0); // All OK -} - -#define DBUG_PRINT_BITSET(N,FRM,BS) \ - do { \ - char buf[256]; \ - for (uint i = 0 ; i < (BS)->n_bits ; ++i) \ - buf[i] = bitmap_is_set((BS), i) ? '1' : '0'; \ - buf[(BS)->n_bits] = '\0'; \ - DBUG_PRINT((N), ((FRM), buf)); \ - } while (0) - - /** Check if an error is a duplicate key error. @@ -7299,45 +7197,76 @@ is_duplicate_key_error(int errcode) return false; } +/** + Write the current row into event's table. -/* - Replace the provided record in the database. + The row is located in the row buffer, pointed by @c m_curr_row member. + Number of columns of the row is stored in @c m_width member (it can be + different from the number of columns in the table to which we insert). + Bitmap @c m_cols indicates which columns are present in the row. It is assumed + that event's table is already open and pointed by @c m_table. - SYNOPSIS - replace_record() - thd Thread context for writing the record. - table Table to which record should be written. - master_reclength - Offset to first column that is not present on the master, - alternatively the length of the record on the master - side. + If the same record already exists in the table it can be either overwritten + or an error is reported depending on the value of @c overwrite flag + (error reporting not yet implemented). Note that the matching record can be + different from the row we insert if we use primary keys to identify records in + the table. - RETURN VALUE - Error code on failure, 0 on success. + The row to be inserted can contain values only for selected columns. The + missing columns are filled with default values using @c prepare_record() + function. If a matching record is found in the table and @c overwritte is + true, the missing columns are taken from it. - DESCRIPTION - Similar to how it is done in mysql_insert(), we first try to do - a ha_write_row() and of that fails due to duplicated keys (or - indices), we do an ha_update_row() or a ha_delete_row() instead. - */ -static int -replace_record(THD *thd, TABLE *table, - ulong const master_reclength, - uint const master_fields) + @param rli Relay log info (needed for row unpacking). + @param overwrite + Shall we overwrite if the row already exists or signal + error (currently ignored). + + @returns Error code on failure, 0 on success. + + This method, if successful, sets @c m_curr_row_end pointer to point at the + next row in the rows buffer. This is done when unpacking the row to be + inserted. + + @note If a matching record is found, it is either updated using + @c ha_update_row() or first deleted and then new record written. +*/ + +int +Rows_log_event::write_row(const RELAY_LOG_INFO *const rli, + const bool overwrite) { - DBUG_ENTER("replace_record"); - DBUG_ASSERT(table != NULL && thd != NULL); + DBUG_ENTER("write_row"); + DBUG_ASSERT(m_table != NULL && thd != NULL); + TABLE *table= m_table; // pointer to event's table int error; int keynum; auto_afree_ptr<char> key(NULL); + /* fill table->record[0] with default values */ + + if ((error= prepare_record(rli, table, m_width, + TRUE /* check if columns have def. values */))) + DBUG_RETURN(error); + + /* unpack row into table->record[0] */ + error= unpack_current_row(rli); // TODO: how to handle errors? + #ifndef DBUG_OFF DBUG_DUMP("record[0]", table->record[0], table->s->reclength); DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set); DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set); #endif + /* + Try to write record. If a corresponding record already exists in the table, + we try to change it using ha_update_row() if possible. Otherwise we delete + it and repeat the whole process again. + + TODO: Add safety measures against infinite looping. + */ + while ((error= table->file->ha_write_row(table->record[0]))) { if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT) @@ -7347,6 +7276,7 @@ replace_record(THD *thd, TABLE *table, } if ((keynum= table->file->get_dup_key(error)) < 0) { + DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns %d)",keynum)); table->file->print_error(error, MYF(0)); /* We failed to retrieve the duplicate key @@ -7368,17 +7298,22 @@ replace_record(THD *thd, TABLE *table, */ if (table->file->ha_table_flags() & HA_DUPLICATE_POS) { + DBUG_PRINT("info",("Locating offending record using rnd_pos()")); error= table->file->rnd_pos(table->record[1], table->file->dup_ref); if (error) { + DBUG_PRINT("info",("rnd_pos() returns error %d",error)); table->file->print_error(error, MYF(0)); DBUG_RETURN(error); } } else { + DBUG_PRINT("info",("Locating offending record using index_read_idx()")); + if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) { + DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE")); DBUG_RETURN(my_errno); } @@ -7386,7 +7321,10 @@ replace_record(THD *thd, TABLE *table, { key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length))); if (key.get() == NULL) + { + DBUG_PRINT("info",("Can't allocate key buffer")); DBUG_RETURN(ENOMEM); + } } key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum, @@ -7397,20 +7335,33 @@ replace_record(THD *thd, TABLE *table, HA_READ_KEY_EXACT); if (error) { + DBUG_PRINT("info",("index_read_idx() returns error %d",error)); table->file->print_error(error, MYF(0)); DBUG_RETURN(error); } } /* - Now, table->record[1] should contain the offending row. That + Now, record[1] should contain the offending row. That will enable us to update it or, alternatively, delete it (so that we can insert the new row afterwards). + */ - First we copy the columns into table->record[0] that are not - present on the master from table->record[1], if there are any. + /* + If row is incomplete we will use the record found to fill + missing columns. */ - copy_extra_record_fields(table, master_reclength, master_fields); + if (!get_flags(COMPLETE_ROWS_F)) + { + restore_record(table,record[1]); + error= unpack_current_row(rli); + } + +#ifndef DBUG_OFF + DBUG_PRINT("debug",("preparing for update: before and after image")); + DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength); + DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength); +#endif /* REPLACE is defined as either INSERT or DELETE + INSERT. If @@ -7430,18 +7381,32 @@ replace_record(THD *thd, TABLE *table, if (last_uniq_key(table, keynum) && !table->file->referenced_by_foreign_key()) { + DBUG_PRINT("info",("Updating row using ha_update_row()")); error=table->file->ha_update_row(table->record[1], table->record[0]); - if (error && error != HA_ERR_RECORD_IS_THE_SAME) - table->file->print_error(error, MYF(0)); - else + switch (error) { + + case HA_ERR_RECORD_IS_THE_SAME: + DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from" + " ha_update_row()")); error= 0; + + case 0: + break; + + default: + DBUG_PRINT("info",("ha_update_row() returns error %d",error)); + table->file->print_error(error, MYF(0)); + } + DBUG_RETURN(error); } else { + DBUG_PRINT("info",("Deleting offending row and trying to write new one again")); if ((error= table->file->ha_delete_row(table->record[1]))) { + DBUG_PRINT("info",("ha_delete_row() returns error %d",error)); table->file->print_error(error, MYF(0)); DBUG_RETURN(error); } @@ -7452,12 +7417,20 @@ replace_record(THD *thd, TABLE *table, DBUG_RETURN(error); } -int Write_rows_log_event::do_exec_row(TABLE *table) +#endif + +int +Write_rows_log_event::do_exec_row(const RELAY_LOG_INFO *const rli) { - DBUG_ASSERT(table != NULL); - int error= replace_record(thd, table, m_master_reclength, m_width); - return error; + DBUG_ASSERT(m_table != NULL); + int error= write_row(rli, TRUE /* overwrite */); + + if (error && !thd->net.last_errno) + thd->net.last_errno= error; + + return error; } + #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ #ifdef MYSQL_CLIENT @@ -7550,40 +7523,52 @@ record_compare_exit: return result; } +/** + Locate the current row in event's table. -/* - Find the row given by 'key', if the table has keys, or else use a table scan - to find (and fetch) the row. - - If the engine allows random access of the records, a combination of - position() and rnd_pos() will be used. + The current row is pointed by @c m_curr_row. Member @c m_width tells how many + columns are there in the row (this can be differnet from the number of columns + in the table). It is assumed that event's table is already open and pointed + by @c m_table. - @param table Pointer to table to search - @param key Pointer to key to use for search, if table has key + If a corresponding record is found in the table it is stored in + @c m_table->record[0]. Note that when record is located based on a primary + key, it is possible that the record found differs from the row being located. - @pre <code>table->record[0]</code> shall contain the row to locate - and <code>key</code> shall contain a key to use for searching, if - the engine has a key. + If no key is specified or table does not have keys, a table scan is used to + find the row. In that case the row should be complete and contain values for + all columns. However, it can still be shorter than the table, i.e. the table + can contain extra columns not present in the row. It is also possible that + the table has fewer columns than the row being located. - @post If the return value is zero, <code>table->record[1]</code> - will contain the fetched row and the internal "cursor" will refer to - the row. If the return value is non-zero, - <code>table->record[1]</code> is undefined. In either case, - <code>table->record[0]</code> is undefined. + @returns Error code on failure, 0 on success. + + @post In case of success @c m_table->record[0] contains the record found. + Also, the internal "cursor" of the table is positioned at the record found. - @return Zero if the row was successfully fetched into - <code>table->record[1]</code>, error code otherwise. + @note If the engine allows random access of the records, a combination of + @c position() and @c rnd_pos() will be used. */ -static int find_and_fetch_row(TABLE *table, uchar *key) +int Rows_log_event::find_row(const RELAY_LOG_INFO *rli) { - DBUG_ENTER("find_and_fetch_row(TABLE *table, uchar *key, uchar *record)"); - DBUG_PRINT("enter", ("table: 0x%lx, key: 0x%lx record: 0x%lx", - (long) table, (long) key, (long) table->record[1])); + DBUG_ENTER("find_row"); + + DBUG_ASSERT(m_table && m_table->in_use != NULL); - DBUG_ASSERT(table->in_use != NULL); + TABLE *table= m_table; + int error; + + /* unpack row - missing fields get default values */ + // TODO: shall we check and report errors here? + prepare_record(NULL,table,m_width,FALSE /* don't check errors */); + error= unpack_current_row(rli); + +#ifndef DBUG_OFF + DBUG_PRINT("info",("looking for the following record")); DBUG_DUMP("record[0]", table->record[0], table->s->reclength); +#endif if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) && table->s->primary_key < MAX_KEY) @@ -7606,34 +7591,48 @@ static int find_and_fetch_row(TABLE *table, uchar *key) table->s->reclength) == 0); */ + DBUG_PRINT("info",("locating record using primary key (position)")); table->file->position(table->record[0]); - int error= table->file->rnd_pos(table->record[0], table->file->ref); - /* - rnd_pos() returns the record in table->record[0], so we have to - move it to table->record[1]. - */ - bmove_align(table->record[1], table->record[0], table->s->reclength); + error= table->file->rnd_pos(table->record[0], table->file->ref); + if (error) + { + DBUG_PRINT("info",("rnd_pos returns error %d",error)); + table->file->print_error(error, MYF(0)); + } DBUG_RETURN(error); } - /* We need to retrieve all fields */ - /* TODO: Move this out from this function to main loop */ + // We can't use pisition() - try other methods. + + /* + We need to retrieve all fields + TODO: Move this out from this function to main loop + */ table->use_all_columns(); if (table->s->keys > 0) { - int error; + DBUG_PRINT("info",("locating record using primary key (index_read)")); + /* We have a key: search the table using the index */ if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE))) + { + DBUG_PRINT("info",("ha_index_init returns error %d",error)); + table->file->print_error(error, MYF(0)); DBUG_RETURN(error); + } - /* - Don't print debug messages when running valgrind since they can - trigger false warnings. - */ + /* Fill key data for the row */ + + DBUG_ASSERT(m_key); + key_copy(m_key, table->record[0], table->key_info, 0); + + /* + Don't print debug messages when running valgrind since they can + trigger false warnings. + */ #ifndef HAVE_purify - DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength); - DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength); + DBUG_DUMP("key data", m_key, table->key_info->key_length); #endif /* @@ -7645,9 +7644,11 @@ static int find_and_fetch_row(TABLE *table, uchar *key) my_ptrdiff_t const pos= table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0; table->record[1][pos]= 0xFF; - if ((error= table->file->index_read_map(table->record[1], key, HA_WHOLE_KEY, + if ((error= table->file->index_read_map(table->record[1], m_key, + HA_WHOLE_KEY, HA_READ_KEY_EXACT))) { + DBUG_PRINT("info",("no record matching the key found in the table")); table->file->print_error(error, MYF(0)); table->file->ha_index_end(); DBUG_RETURN(error); @@ -7658,8 +7659,8 @@ static int find_and_fetch_row(TABLE *table, uchar *key) trigger false warnings. */ #ifndef HAVE_purify - DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength); - DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength); + DBUG_PRINT("info",("found first matching record")); + DBUG_DUMP("record[0]", table->record[0], table->s->reclength); #endif /* Below is a minor "optimization". If the key (i.e., key number @@ -7681,10 +7682,15 @@ static int find_and_fetch_row(TABLE *table, uchar *key) DBUG_RETURN(0); } + /* + In case key is not unique, we still have to iterate over records found + and find the one which is identical to the row given. The row is unpacked + in record[1] where missing columns are filled with default values. + */ + DBUG_PRINT("info",("non-unique index, scanning it to find matching record")); + while (record_compare(table)) { - int error; - /* We need to set the null bytes to ensure that the filler bit are all set when returning. There are storage engines that @@ -7702,9 +7708,10 @@ static int find_and_fetch_row(TABLE *table, uchar *key) if ((error= table->file->index_next(table->record[1]))) { - table->file->print_error(error, MYF(0)); + DBUG_PRINT("info",("no record matching the given row found")); + table->file->print_error(error, MYF(0)); table->file->ha_index_end(); - DBUG_RETURN(error); + DBUG_RETURN(error); } } @@ -7715,44 +7722,57 @@ static int find_and_fetch_row(TABLE *table, uchar *key) } else { + DBUG_PRINT("info",("locating record using table scan (rnd_next)")); + int restart_count= 0; // Number of times scanning has restarted from top - int error; /* We don't have a key: search the table using rnd_next() */ if ((error= table->file->ha_rnd_init(1))) - return error; + { + DBUG_PRINT("info",("error initializing table scan" + " (ha_rnd_init returns %d)",error)); + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } /* Continue until we find the right record or have made a full loop */ do { error= table->file->rnd_next(table->record[1]); - DBUG_DUMP("record[0]", table->record[0], table->s->reclength); - DBUG_DUMP("record[1]", table->record[1], table->s->reclength); - switch (error) { + case 0: + DBUG_DUMP("record found", table->record[0], table->s->reclength); + case HA_ERR_RECORD_DELETED: - break; + break; case HA_ERR_END_OF_FILE: - if (++restart_count < 2) - table->file->ha_rnd_init(1); - break; + if (++restart_count < 2) + table->file->ha_rnd_init(1); + break; default: - table->file->print_error(error, MYF(0)); - DBUG_PRINT("info", ("Record not found")); + DBUG_PRINT("info", ("Failed to get next record" + " (rnd_next returns %d)",error)); + table->file->print_error(error, MYF(0)); table->file->ha_rnd_end(); - DBUG_RETURN(error); + DBUG_RETURN(error); } } while (restart_count < 2 && record_compare(table)); + + /* + Note: above record_compare will take into accout all record fields + which might be incorrect in case a partial row was given in the event + */ /* Have to restart the scan to be able to fetch the next row. */ - DBUG_PRINT("info", ("Record %sfound", restart_count == 2 ? "not " : "")); + if (restart_count == 2) + DBUG_PRINT("info", ("Record not found")); table->file->ha_rnd_end(); DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0); @@ -7761,6 +7781,7 @@ static int find_and_fetch_row(TABLE *table, uchar *key) DBUG_RETURN(0); } + #endif /* @@ -7772,9 +7793,6 @@ Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, MY_BITMAP const *cols, bool is_transactional) : Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional) -#ifdef HAVE_REPLICATION - ,m_memory(NULL), m_key(NULL), m_after_image(NULL) -#endif { } #endif /* #if !defined(MYSQL_CLIENT) */ @@ -7786,23 +7804,18 @@ Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg, Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len, const Format_description_log_event *description_event) -#if defined(MYSQL_CLIENT) : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event) -#else - : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event), - m_memory(NULL), m_key(NULL), m_after_image(NULL) -#endif { } #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -int Delete_rows_log_event::do_before_row_operations(TABLE *table) -{ - DBUG_ASSERT(m_memory == NULL); - if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) && - table->s->primary_key < MAX_KEY) +int +Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const) +{ + if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) && + m_table->s->primary_key < MAX_KEY) { /* We don't need to allocate any memory for m_after_image and @@ -7811,82 +7824,39 @@ int Delete_rows_log_event::do_before_row_operations(TABLE *table) return 0; } - int error= 0; - - if (table->s->keys > 0) - { - m_memory= (uchar*) my_multi_malloc(MYF(MY_WME), - &m_after_image, - (uint) table->s->reclength, - &m_key, - (uint) table->key_info->key_length, - NullS); - } - else + if (m_table->s->keys > 0) { - m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME)); - m_memory= (uchar*)m_after_image; - m_key= NULL; + // Allocate buffer for key searches + m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME)); + if (!m_key) + return HA_ERR_OUT_OF_MEM; } - if (!m_memory) - return HA_ERR_OUT_OF_MEM; - - return error; + return 0; } -int Delete_rows_log_event::do_after_row_operations(TABLE *table, int error) +int +Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const, + int error) { /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/ - table->file->ha_index_or_rnd_end(); - my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc - m_memory= NULL; - m_after_image= NULL; + m_table->file->ha_index_or_rnd_end(); + my_free(m_key, MYF(MY_ALLOW_ZERO_PTR)); m_key= NULL; return error; } -int Delete_rows_log_event::do_prepare_row(THD *thd_arg, - RELAY_LOG_INFO const *rli, - TABLE *table, - uchar const *const row_start, - uchar const **const row_end) -{ - DBUG_ASSERT(row_start && row_end); - if (int error= unpack_row(rli, table, m_width, row_start, &m_cols, row_end, - &m_master_reclength, table->read_set, DELETE_ROWS_EVENT)) - { - thd_arg->net.last_errno= error; - return error; - } - - /* - If we will access rows using the random access method, m_key will - be set to NULL, so we do not need to make a key copy in that case. - */ - if (m_key) - { - KEY *const key_info= table->key_info; - - key_copy(m_key, table->record[0], key_info, 0); - } - - return 0; -} - -int Delete_rows_log_event::do_exec_row(TABLE *table) +int Delete_rows_log_event::do_exec_row(const RELAY_LOG_INFO *const rli) { int error; - DBUG_ASSERT(table != NULL); + DBUG_ASSERT(m_table != NULL); - if (!(error= find_and_fetch_row(table, m_key))) + if (!(error= find_row(rli))) { /* - Now we should have the right row to delete. We are using - record[0] since it is guaranteed to point to a record with the - correct value. + Delete the record found, located in record[0] */ - error= table->file->ha_delete_row(table->record[0]); + error= m_table->file->ha_delete_row(m_table->record[0]); } return error; } @@ -7916,10 +7886,6 @@ Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg, MY_BITMAP const *cols_ai, bool is_transactional) : Rows_log_event(thd_arg, tbl_arg, tid, cols_bi, is_transactional) -#ifdef HAVE_REPLICATION - , m_memory(NULL), m_key(NULL) - -#endif { init(cols_ai); } @@ -7929,16 +7895,13 @@ Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg, MY_BITMAP const *cols, bool is_transactional) : Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional) -#ifdef HAVE_REPLICATION - , m_memory(NULL), m_key(NULL) -#endif { init(cols); } void Update_rows_log_event::init(MY_BITMAP const *cols) { - /* if bitmap_init fails, catched in is_valid() */ + /* if bitmap_init fails, caught in is_valid() */ if (likely(!bitmap_init(&m_cols_ai, m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL, m_width, @@ -7971,157 +7934,87 @@ Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len, const Format_description_log_event *description_event) -#if defined(MYSQL_CLIENT) : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event) -#else - : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event), - m_memory(NULL), m_key(NULL) -#endif { } #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -int Update_rows_log_event::do_before_row_operations(TABLE *table) -{ - DBUG_ASSERT(m_memory == NULL); - - int error= 0; - if (table->s->keys > 0) - { - m_memory= (uchar*) my_multi_malloc(MYF(MY_WME), - &m_after_image, - (uint) table->s->reclength, - &m_key, - (uint) table->key_info->key_length, - NullS); - } - else +int +Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const) +{ + if (m_table->s->keys > 0) { - m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME)); - m_memory= m_after_image; - m_key= NULL; + // Allocate buffer for key searches + m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME)); + if (!m_key) + return HA_ERR_OUT_OF_MEM; } - if (!m_memory) - return HA_ERR_OUT_OF_MEM; - table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; + m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; - return error; + return 0; } -int Update_rows_log_event::do_after_row_operations(TABLE *table, int error) +int +Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const, + int error) { - /* - error= ToDo:find out what this should really be, this triggers - close_scan in nbd, returning error? - */ - table->file->ha_index_or_rnd_end(); - my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); - m_memory= NULL; - m_after_image= NULL; + /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/ + m_table->file->ha_index_or_rnd_end(); + my_free(m_key, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc m_key= NULL; return error; } -int Update_rows_log_event::do_prepare_row(THD *thd_arg, - RELAY_LOG_INFO const *rli, - TABLE *table, - uchar const *const row_start, - uchar const **const row_end) +int +Update_rows_log_event::do_exec_row(const RELAY_LOG_INFO *const rli) { - int error; - DBUG_ASSERT(row_start && row_end); - /* - We need to perform some juggling below since unpack_row() always - unpacks into table->record[0]. For more information, see the - comments for unpack_row(). - */ + DBUG_ASSERT(m_table != NULL); - /* record[0] is the before image for the update */ - if ((error= unpack_row(rli, table, m_width, row_start, &m_cols, row_end, - &m_master_reclength, table->read_set, - UPDATE_ROWS_EVENT))) - { - thd_arg->net.last_errno= error; - return error; - } - - store_record(table, record[1]); - uchar const *next_start = *row_end; - /* m_after_image is the after image for the update */ - if ((error= unpack_row(rli, table, m_width, next_start, &m_cols_ai, row_end, - &m_master_reclength, table->write_set, - UPDATE_ROWS_EVENT))) - { - thd_arg->net.last_errno= error; + int error= find_row(rli); + if (error) return error; - } - - bmove_align(m_after_image, table->record[0], table->s->reclength); - restore_record(table, record[1]); - - /* - Don't print debug messages when running valgrind since they can - trigger false warnings. - */ -#ifndef HAVE_purify - DBUG_DUMP("record[0]", table->record[0], table->s->reclength); - DBUG_DUMP("m_after_image", m_after_image, table->s->reclength); -#endif /* - If we will access rows using the random access method, m_key will - be set to NULL, so we do not need to make a key copy in that case. - */ - if (m_key) - { - KEY *const key_info= table->key_info; - - key_copy(m_key, table->record[0], key_info, 0); - } + This is the situation after locating BI: - return error; -} + ===|=== before image ====|=== after image ===|=== + ^ ^ + m_curr_row m_curr_row_end -int Update_rows_log_event::do_exec_row(TABLE *table) -{ - DBUG_ASSERT(table != NULL); + BI found in the table is stored in record[0]. We copy it to record[1] + and unpack AI to record[0]. + */ - int error= find_and_fetch_row(table, m_key); - if (error) - return error; + store_record(m_table,record[1]); - /* - We have to ensure that the new record (i.e., the after image) is - in record[0] and the old record (i.e., the before image) is in - record[1]. This since some storage engines require this (for - example, the partition engine). - - Since find_and_fetch_row() puts the fetched record (i.e., the old - record) in record[1], we can keep it there. We put the new record - (i.e., the after image) into record[0], and copy the fields that - are on the slave (i.e., in record[1]) into record[0], effectively - overwriting the default values that where put there by the - unpack_row() function. - */ - bmove_align(table->record[0], m_after_image, table->s->reclength); - copy_extra_record_fields(table, m_master_reclength, m_width); + m_curr_row= m_curr_row_end; + error= unpack_current_row(rli); // this also updates m_curr_row_end /* Now we have the right row to update. The old row (the one we're - looking for) is in record[1] and the new row has is in record[0]. - We also have copied the original values already in the slave's - database into the after image delivered from the master. + looking for) is in record[1] and the new row is in record[0]. */ - error= table->file->ha_update_row(table->record[1], table->record[0]); +#ifndef HAVE_purify + /* + Don't print debug messages when running valgrind since they can + trigger false warnings. + */ + DBUG_PRINT("info",("Updating row in table")); + DBUG_DUMP("old record", m_table->record[1], m_table->s->reclength); + DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength); +#endif + + error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]); if (error == HA_ERR_RECORD_IS_THE_SAME) error= 0; return error; } + #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ #ifdef MYSQL_CLIENT |