diff options
Diffstat (limited to 'sql/log_event_old.cc')
-rw-r--r-- | sql/log_event_old.cc | 2907 |
1 files changed, 2907 insertions, 0 deletions
diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc new file mode 100644 index 00000000000..68cd2bf4673 --- /dev/null +++ b/sql/log_event_old.cc @@ -0,0 +1,2907 @@ + +#include "mysql_priv.h" +#ifndef MYSQL_CLIENT +#include "rpl_rli.h" +#include "rpl_utility.h" +#endif +#include "log_event_old.h" +#include "rpl_record_old.h" + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + +// Old implementation of do_apply_event() +int +Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, const Relay_log_info *rli) +{ + DBUG_ENTER("Old_rows_log_event::do_apply_event(st_relay_log_info*)"); + int error= 0; + THD *thd= ev->thd; + uchar const *row_start= ev->m_rows_buf; + + /* + If m_table_id == ~0UL, then we have a dummy event that does not + contain any data. In that case, we just remove all tables in the + tables_to_lock list, close the thread tables, and return with + success. + */ + if (ev->m_table_id == ~0UL) + { + /* + This one is supposed to be set: just an extra check so that + nothing strange has happened. + */ + DBUG_ASSERT(ev->get_flags(Old_rows_log_event::STMT_END_F)); + + const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + close_thread_tables(thd); + thd->clear_error(); + DBUG_RETURN(0); + } + + /* + 'thd' has been set by exec_relay_log_event(), just before calling + do_apply_event(). We still check here to prevent future coding + errors. + */ + DBUG_ASSERT(rli->sql_thd == thd); + + /* + If there is no locks taken, this is the first binrow event seen + after the table map events. We should then lock all the tables + used in the transaction and proceed with execution of the actual + event. + */ + if (!thd->lock) + { + /* + Lock_tables() reads the contents of thd->lex, so they must be + initialized. + + We also call the mysql_reset_thd_for_next_command(), since this + is the logical start of the next "statement". Note that this + call might reset the value of current_stmt_binlog_row_based, so + we need to do any changes to that value after this function. + */ + lex_start(thd); + mysql_reset_thd_for_next_command(thd); + + /* + Check if the slave is set to use SBR. If so, it should switch + to using RBR until the end of the "statement", i.e., next + STMT_END_F or next error. + */ + if (!thd->current_stmt_binlog_row_based && + mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG)) + { + thd->set_current_stmt_binlog_row_based(); + } + + if (simple_open_n_lock_tables(thd, rli->tables_to_lock)) + { + uint actual_error= thd->main_da.sql_errno(); + if (thd->is_slave_error || thd->is_fatal_error) + { + /* + Error reporting borrowed from Query_log_event with many excessive + simplifications (we don't honour --slave-skip-errors) + */ + rli->report(ERROR_LEVEL, actual_error, + "Error '%s' on opening tables", + (actual_error ? thd->main_da.message() : + "unexpected success or fatal error")); + thd->is_slave_error= 1; + } + const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + DBUG_RETURN(actual_error); + } + + /* + When the open and locking succeeded, we check all tables to + ensure that they still have the correct type. + + We can use a down cast here since we know that every table added + to the tables_to_lock is a RPL_TABLE_LIST. + */ + + { + RPL_TABLE_LIST *ptr= rli->tables_to_lock; + for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global)) + { + if (ptr->m_tabledef.compatible_with(rli, ptr->table)) + { + mysql_unlock_tables(thd, thd->lock); + thd->lock= 0; + thd->is_slave_error= 1; + const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF); + } + } + } + + /* + ... and then we add all the tables to the table map and remove + them from tables to lock. + + We also invalidate the query cache for all the tables, since + they will now be changed. + + TODO [/Matz]: Maybe the query cache should not be invalidated + here? It might be that a table is not changed, even though it + was locked for the statement. We do know that each + Old_rows_log_event contain at least one row, so after processing one + Old_rows_log_event, we can invalidate the query cache for the + associated table. + */ + for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global) + { + const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table); + } +#ifdef HAVE_QUERY_CACHE + query_cache.invalidate_locked_for_write(rli->tables_to_lock); +#endif + } + + TABLE* table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(ev->m_table_id); + + if (table) + { + /* + table == NULL means that this table should not be replicated + (this was set up by Table_map_log_event::do_apply_event() + which tested replicate-* rules). + */ + + /* + It's not needed to set_time() but + 1) it continues the property that "Time" in SHOW PROCESSLIST shows how + much slave is behind + 2) it will be needed when we allow replication from a table with no + TIMESTAMP column to a table with one. + So we call set_time(), like in SBR. Presently it changes nothing. + */ + thd->set_time((time_t)ev->when); + /* + There are a few flags that are replicated with each row event. + Make sure to set/clear them before executing the main body of + the event. + */ + if (ev->get_flags(Old_rows_log_event::NO_FOREIGN_KEY_CHECKS_F)) + thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS; + else + thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS; + + if (ev->get_flags(Old_rows_log_event::RELAXED_UNIQUE_CHECKS_F)) + thd->options|= OPTION_RELAXED_UNIQUE_CHECKS; + else + thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS; + /* A small test to verify that objects have consistent types */ + DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS)); + + /* + Now we are in a statement and will stay in a statement until we + see a STMT_END_F. + + We set this flag here, before actually applying any rows, in + case the SQL thread is stopped and we need to detect that we're + inside a statement and halting abruptly might cause problems + when restarting. + */ + const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT); + + error= do_before_row_operations(table); + while (error == 0 && row_start < ev->m_rows_end) + { + uchar const *row_end= NULL; + if ((error= do_prepare_row(thd, rli, table, row_start, &row_end))) + break; // We should perform the after-row operation even in + // the case of error + + DBUG_ASSERT(row_end != NULL); // cannot happen + DBUG_ASSERT(row_end <= ev->m_rows_end); + + /* in_use can have been set to NULL in close_tables_for_reopen */ + THD* old_thd= table->in_use; + if (!table->in_use) + table->in_use= thd; + error= do_exec_row(table); + table->in_use = old_thd; + switch (error) + { + /* Some recoverable errors */ + case HA_ERR_RECORD_CHANGED: + case HA_ERR_KEY_NOT_FOUND: /* Idempotency support: OK if + tuple does not exist */ + error= 0; + case 0: + break; + + default: + rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), + "Error in %s event: row application failed. %s", + ev->get_type_str(), + thd->is_error() ? thd->main_da.message() : ""); + thd->is_slave_error= 1; + break; + } + + row_start= row_end; + } + DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event", + const_cast<Relay_log_info*>(rli)->abort_slave= 1;); + error= do_after_row_operations(table, error); + if (!ev->cache_stmt) + { + DBUG_PRINT("info", ("Marked that we need to keep log")); + thd->options|= OPTION_KEEP_LOG; + } + } + + /* + We need to delay this clear until the table def is no longer needed. + The table def is needed in unpack_row(). + */ + if (rli->tables_to_lock && ev->get_flags(Old_rows_log_event::STMT_END_F)) + const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + + if (error) + { /* error has occured during the transaction */ + rli->report(ERROR_LEVEL, thd->main_da.sql_errno(), + "Error in %s event: error during transaction execution " + "on table %s.%s. %s", + ev->get_type_str(), table->s->db.str, + table->s->table_name.str, + thd->is_error() ? thd->main_da.message() : ""); + + /* + If one day we honour --skip-slave-errors in row-based replication, and + the error should be skipped, then we would clear mappings, rollback, + close tables, but the slave SQL thread would not stop and then may + assume the mapping is still available, the tables are still open... + So then we should clear mappings/rollback/close here only if this is a + STMT_END_F. + For now we code, knowing that error is not skippable and so slave SQL + thread is certainly going to stop. + rollback at the caller along with sbr. + */ + thd->reset_current_stmt_binlog_row_based(); + const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error); + thd->is_slave_error= 1; + DBUG_RETURN(error); + } + + /* + This code would ideally be placed in do_update_pos() instead, but + since we have no access to table there, we do the setting of + last_event_start_time here instead. + */ + if (table && (table->s->primary_key == MAX_KEY) && + !ev->cache_stmt && + ev->get_flags(Old_rows_log_event::STMT_END_F) == Old_rows_log_event::RLE_NO_FLAGS) + { + /* + ------------ Temporary fix until WL#2975 is implemented --------- + + This event is not the last one (no STMT_END_F). If we stop now + (in case of terminate_slave_thread()), how will we restart? We + have to restart from Table_map_log_event, but as this table is + not transactional, the rows already inserted will still be + present, and idempotency is not guaranteed (no PK) so we risk + that repeating leads to double insert. So we desperately try to + continue, hope we'll eventually leave this buggy situation (by + executing the final Old_rows_log_event). If we are in a hopeless + wait (reached end of last relay log and nothing gets appended + there), we timeout after one minute, and notify DBA about the + problem. When WL#2975 is implemented, just remove the member + st_relay_log_info::last_event_start_time and all its occurences. + */ + const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0); + } + + DBUG_RETURN(0); +} +#endif + + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + +/* + Check if there are more UNIQUE keys after the given key. +*/ +static int +last_uniq_key(TABLE *table, uint keyno) +{ + while (++keyno < table->s->keys) + if (table->key_info[keyno].flags & HA_NOSAME) + return 0; + return 1; +} + + +/* + Compares table->record[0] and table->record[1] + + Returns TRUE if different. +*/ +static bool record_compare(TABLE *table) +{ + /* + Need to set the X bit and the filler bits in both records since + there are engines that do not set it correctly. + + In addition, since MyISAM checks that one hasn't tampered with the + record, it is necessary to restore the old bytes into the record + after doing the comparison. + + TODO[record format ndb]: Remove it once NDB returns correct + records. Check that the other engines also return correct records. + */ + + bool result= FALSE; + uchar saved_x[2], saved_filler[2]; + + if (table->s->null_bytes > 0) + { + for (int i = 0 ; i < 2 ; ++i) + { + saved_x[i]= table->record[i][0]; + saved_filler[i]= table->record[i][table->s->null_bytes - 1]; + table->record[i][0]|= 1U; + table->record[i][table->s->null_bytes - 1]|= + 256U - (1U << table->s->last_null_bit_pos); + } + } + + if (table->s->blob_fields + table->s->varchar_fields == 0) + { + result= cmp_record(table,record[1]); + goto record_compare_exit; + } + + /* Compare null bits */ + if (memcmp(table->null_flags, + table->null_flags+table->s->rec_buff_length, + table->s->null_bytes)) + { + result= TRUE; // Diff in NULL value + goto record_compare_exit; + } + + /* Compare updated fields */ + for (Field **ptr=table->field ; *ptr ; ptr++) + { + if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length)) + { + result= TRUE; + goto record_compare_exit; + } + } + +record_compare_exit: + /* + Restore the saved bytes. + + TODO[record format ndb]: Remove this code once NDB returns the + correct record format. + */ + if (table->s->null_bytes > 0) + { + for (int i = 0 ; i < 2 ; ++i) + { + table->record[i][0]= saved_x[i]; + table->record[i][table->s->null_bytes - 1]= saved_filler[i]; + } + } + + return result; +} + + +/* + Copy "extra" columns from record[1] to record[0]. + + Copy the extra fields that are not present on the master but are + present on the slave from record[1] to record[0]. This is used + after fetching a record that are to be updated, either inside + replace_record() or as part of executing an update_row(). + */ +static int +copy_extra_record_fields(TABLE *table, + size_t master_reclength, + my_ptrdiff_t master_fields) +{ + DBUG_ENTER("copy_extra_record_fields(table, master_reclen, master_fields)"); + DBUG_PRINT("info", ("Copying to 0x%lx " + "from field %lu at offset %lu " + "to field %d at offset %lu", + (long) table->record[0], + (ulong) master_fields, (ulong) master_reclength, + table->s->fields, table->s->reclength)); + /* + Copying the extra fields of the slave that does not exist on + master into record[0] (which are basically the default values). + */ + + if (table->s->fields < (uint) master_fields) + DBUG_RETURN(0); + + DBUG_ASSERT(master_reclength <= table->s->reclength); + if (master_reclength < table->s->reclength) + bmove_align(table->record[0] + master_reclength, + table->record[1] + master_reclength, + table->s->reclength - master_reclength); + + /* + Bit columns are special. We iterate over all the remaining + columns and copy the "extra" bits to the new record. This is + not a very good solution: it should be refactored on + opportunity. + + REFACTORING SUGGESTION (Matz). Introduce a member function + similar to move_field_offset() called copy_field_offset() to + copy field values and implement it for all Field subclasses. Use + this function to copy data from the found record to the record + that are going to be inserted. + + The copy_field_offset() function need to be a virtual function, + which in this case will prevent copying an entire range of + fields efficiently. + */ + { + Field **field_ptr= table->field + master_fields; + for ( ; *field_ptr ; ++field_ptr) + { + /* + Set the null bit according to the values in record[1] + */ + if ((*field_ptr)->maybe_null() && + (*field_ptr)->is_null_in_record(reinterpret_cast<uchar*>(table->record[1]))) + (*field_ptr)->set_null(); + else + (*field_ptr)->set_notnull(); + + /* + Do the extra work for special columns. + */ + switch ((*field_ptr)->real_type()) + { + default: + /* Nothing to do */ + break; + + case MYSQL_TYPE_BIT: + Field_bit *f= static_cast<Field_bit*>(*field_ptr); + if (f->bit_len > 0) + { + my_ptrdiff_t const offset= table->record[1] - table->record[0]; + uchar const bits= + get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len); + set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len); + } + break; + } + } + } + DBUG_RETURN(0); // All OK +} + + +/* + Replace the provided record in the database. + + SYNOPSIS + replace_record() + thd Thread context for writing the record. + table Table to which record should be written. + master_reclength + Offset to first column that is not present on the master, + alternatively the length of the record on the master + side. + + RETURN VALUE + Error code on failure, 0 on success. + + DESCRIPTION + Similar to how it is done in mysql_insert(), we first try to do + a ha_write_row() and of that fails due to duplicated keys (or + indices), we do an ha_update_row() or a ha_delete_row() instead. + */ +static int +replace_record(THD *thd, TABLE *table, + ulong const master_reclength, + uint const master_fields) +{ + DBUG_ENTER("replace_record"); + DBUG_ASSERT(table != NULL && thd != NULL); + + int error; + int keynum; + auto_afree_ptr<char> key(NULL); + +#ifndef DBUG_OFF + DBUG_DUMP("record[0]", table->record[0], table->s->reclength); + DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set); + DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set); +#endif + + while ((error= table->file->ha_write_row(table->record[0]))) + { + if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT) + { + table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */ + DBUG_RETURN(error); + } + if ((keynum= table->file->get_dup_key(error)) < 0) + { + table->file->print_error(error, MYF(0)); + /* + We failed to retrieve the duplicate key + - either because the error was not "duplicate key" error + - or because the information which key is not available + */ + DBUG_RETURN(error); + } + + /* + We need to retrieve the old row into record[1] to be able to + either update or delete the offending record. We either: + + - use rnd_pos() with a row-id (available as dupp_row) to the + offending row, if that is possible (MyISAM and Blackhole), or else + + - use index_read_idx() with the key that is duplicated, to + retrieve the offending row. + */ + if (table->file->ha_table_flags() & HA_DUPLICATE_POS) + { + error= table->file->rnd_pos(table->record[1], table->file->dup_ref); + if (error) + { + DBUG_PRINT("info",("rnd_pos() returns error %d",error)); + if (error == HA_ERR_RECORD_DELETED) + error= HA_ERR_KEY_NOT_FOUND; + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } + } + else + { + if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) + { + DBUG_RETURN(my_errno); + } + + if (key.get() == NULL) + { + key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length))); + if (key.get() == NULL) + DBUG_RETURN(ENOMEM); + } + + key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum, + 0); + error= table->file->index_read_idx_map(table->record[1], keynum, + (const uchar*)key.get(), + HA_WHOLE_KEY, + HA_READ_KEY_EXACT); + if (error) + { + DBUG_PRINT("info", ("index_read_idx() returns error %d", error)); + if (error == HA_ERR_RECORD_DELETED) + error= HA_ERR_KEY_NOT_FOUND; + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } + } + + /* + Now, table->record[1] should contain the offending row. That + will enable us to update it or, alternatively, delete it (so + that we can insert the new row afterwards). + + First we copy the columns into table->record[0] that are not + present on the master from table->record[1], if there are any. + */ + copy_extra_record_fields(table, master_reclength, master_fields); + + /* + REPLACE is defined as either INSERT or DELETE + INSERT. If + possible, we can replace it with an UPDATE, but that will not + work on InnoDB if FOREIGN KEY checks are necessary. + + I (Matz) am not sure of the reason for the last_uniq_key() + check as, but I'm guessing that it's something along the + following lines. + + Suppose that we got the duplicate key to be a key that is not + the last unique key for the table and we perform an update: + then there might be another key for which the unique check will + fail, so we're better off just deleting the row and inserting + the correct row. + */ + if (last_uniq_key(table, keynum) && + !table->file->referenced_by_foreign_key()) + { + error=table->file->ha_update_row(table->record[1], + table->record[0]); + if (error && error != HA_ERR_RECORD_IS_THE_SAME) + table->file->print_error(error, MYF(0)); + else + error= 0; + DBUG_RETURN(error); + } + else + { + if ((error= table->file->ha_delete_row(table->record[1]))) + { + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } + /* Will retry ha_write_row() with the offending row removed. */ + } + } + + DBUG_RETURN(error); +} + + +/** + Find the row given by 'key', if the table has keys, or else use a table scan + to find (and fetch) the row. + + If the engine allows random access of the records, a combination of + position() and rnd_pos() will be used. + + @param table Pointer to table to search + @param key Pointer to key to use for search, if table has key + + @pre <code>table->record[0]</code> shall contain the row to locate + and <code>key</code> shall contain a key to use for searching, if + the engine has a key. + + @post If the return value is zero, <code>table->record[1]</code> + will contain the fetched row and the internal "cursor" will refer to + the row. If the return value is non-zero, + <code>table->record[1]</code> is undefined. In either case, + <code>table->record[0]</code> is undefined. + + @return Zero if the row was successfully fetched into + <code>table->record[1]</code>, error code otherwise. + */ + +static int find_and_fetch_row(TABLE *table, uchar *key) +{ + DBUG_ENTER("find_and_fetch_row(TABLE *table, uchar *key, uchar *record)"); + DBUG_PRINT("enter", ("table: 0x%lx, key: 0x%lx record: 0x%lx", + (long) table, (long) key, (long) table->record[1])); + + DBUG_ASSERT(table->in_use != NULL); + + DBUG_DUMP("record[0]", table->record[0], table->s->reclength); + + if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) && + table->s->primary_key < MAX_KEY) + { + /* + Use a more efficient method to fetch the record given by + table->record[0] if the engine allows it. We first compute a + row reference using the position() member function (it will be + stored in table->file->ref) and the use rnd_pos() to position + the "cursor" (i.e., record[0] in this case) at the correct row. + + TODO: Add a check that the correct record has been fetched by + comparing with the original record. Take into account that the + record on the master and slave can be of different + length. Something along these lines should work: + + ADD>>> store_record(table,record[1]); + int error= table->file->rnd_pos(table->record[0], table->file->ref); + ADD>>> DBUG_ASSERT(memcmp(table->record[1], table->record[0], + table->s->reclength) == 0); + + */ + table->file->position(table->record[0]); + int error= table->file->rnd_pos(table->record[0], table->file->ref); + /* + rnd_pos() returns the record in table->record[0], so we have to + move it to table->record[1]. + */ + bmove_align(table->record[1], table->record[0], table->s->reclength); + DBUG_RETURN(error); + } + + /* We need to retrieve all fields */ + /* TODO: Move this out from this function to main loop */ + table->use_all_columns(); + + if (table->s->keys > 0) + { + int error; + /* We have a key: search the table using the index */ + if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE))) + DBUG_RETURN(error); + + /* + Don't print debug messages when running valgrind since they can + trigger false warnings. + */ +#ifndef HAVE_purify + DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength); + DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength); +#endif + + /* + We need to set the null bytes to ensure that the filler bit are + all set when returning. There are storage engines that just set + the necessary bits on the bytes and don't set the filler bits + correctly. + */ + my_ptrdiff_t const pos= + table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0; + table->record[1][pos]= 0xFF; + if ((error= table->file->index_read_map(table->record[1], key, HA_WHOLE_KEY, + HA_READ_KEY_EXACT))) + { + table->file->print_error(error, MYF(0)); + table->file->ha_index_end(); + DBUG_RETURN(error); + } + + /* + Don't print debug messages when running valgrind since they can + trigger false warnings. + */ +#ifndef HAVE_purify + DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength); + DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength); +#endif + /* + Below is a minor "optimization". If the key (i.e., key number + 0) has the HA_NOSAME flag set, we know that we have found the + correct record (since there can be no duplicates); otherwise, we + have to compare the record with the one found to see if it is + the correct one. + + CAVEAT! This behaviour is essential for the replication of, + e.g., the mysql.proc table since the correct record *shall* be + found using the primary key *only*. There shall be no + comparison of non-PK columns to decide if the correct record is + found. I can see no scenario where it would be incorrect to + chose the row to change only using a PK or an UNNI. + */ + if (table->key_info->flags & HA_NOSAME) + { + table->file->ha_index_end(); + DBUG_RETURN(0); + } + + while (record_compare(table)) + { + int error; + + /* + We need to set the null bytes to ensure that the filler bit + are all set when returning. There are storage engines that + just set the necessary bits on the bytes and don't set the + filler bits correctly. + + TODO[record format ndb]: Remove this code once NDB returns the + correct record format. + */ + if (table->s->null_bytes > 0) + { + table->record[1][table->s->null_bytes - 1]|= + 256U - (1U << table->s->last_null_bit_pos); + } + + while ((error= table->file->index_next(table->record[1]))) + { + /* We just skip records that has already been deleted */ + if (error == HA_ERR_RECORD_DELETED) + continue; + table->file->print_error(error, MYF(0)); + table->file->ha_index_end(); + DBUG_RETURN(error); + } + } + + /* + Have to restart the scan to be able to fetch the next row. + */ + table->file->ha_index_end(); + } + else + { + int restart_count= 0; // Number of times scanning has restarted from top + int error; + + /* We don't have a key: search the table using rnd_next() */ + if ((error= table->file->ha_rnd_init(1))) + return error; + + /* Continue until we find the right record or have made a full loop */ + do + { + restart_rnd_next: + error= table->file->rnd_next(table->record[1]); + + DBUG_DUMP("record[0]", table->record[0], table->s->reclength); + DBUG_DUMP("record[1]", table->record[1], table->s->reclength); + + switch (error) { + case 0: + break; + + /* + If the record was deleted, we pick the next one without doing + any comparisons. + */ + case HA_ERR_RECORD_DELETED: + goto restart_rnd_next; + + case HA_ERR_END_OF_FILE: + if (++restart_count < 2) + table->file->ha_rnd_init(1); + break; + + default: + table->file->print_error(error, MYF(0)); + DBUG_PRINT("info", ("Record not found")); + table->file->ha_rnd_end(); + DBUG_RETURN(error); + } + } + while (restart_count < 2 && record_compare(table)); + + /* + Have to restart the scan to be able to fetch the next row. + */ + DBUG_PRINT("info", ("Record %sfound", restart_count == 2 ? "not " : "")); + table->file->ha_rnd_end(); + + DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0); + DBUG_RETURN(error); + } + + DBUG_RETURN(0); +} + + +/********************************************************** + Row handling primitives for Write_rows_log_event_old + **********************************************************/ + +int Write_rows_log_event_old::do_before_row_operations(TABLE *table) +{ + int error= 0; + + /* + We are using REPLACE semantics and not INSERT IGNORE semantics + when writing rows, that is: new rows replace old rows. We need to + inform the storage engine that it should use this behaviour. + */ + + /* Tell the storage engine that we are using REPLACE semantics. */ + thd->lex->duplicates= DUP_REPLACE; + + /* + Pretend we're executing a REPLACE command: this is needed for + InnoDB and NDB Cluster since they are not (properly) checking the + lex->duplicates flag. + */ + thd->lex->sql_command= SQLCOM_REPLACE; + /* + Do not raise the error flag in case of hitting to an unique attribute + */ + table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); + /* + NDB specific: update from ndb master wrapped as Write_rows + */ + /* + so that the event should be applied to replace slave's row + */ + table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); + /* + NDB specific: if update from ndb master wrapped as Write_rows + does not find the row it's assumed idempotent binlog applying + is taking place; don't raise the error. + */ + table->file->extra(HA_EXTRA_IGNORE_NO_KEY); + /* + TODO: the cluster team (Tomas?) says that it's better if the engine knows + how many rows are going to be inserted, then it can allocate needed memory + from the start. + */ + table->file->ha_start_bulk_insert(0); + /* + We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill + any TIMESTAMP column with data from the row but instead will use + the event's current time. + As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra + columns, we know that all TIMESTAMP columns on slave will receive explicit + data from the row, so TIMESTAMP_NO_AUTO_SET is ok. + When we allow a table without TIMESTAMP to be replicated to a table having + more columns including a TIMESTAMP column, or when we allow a TIMESTAMP + column to be replicated into a BIGINT column and the slave's table has a + TIMESTAMP column, then the slave's TIMESTAMP column will take its value + from set_time() which we called earlier (consistent with SBR). And then in + some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to + analyze if explicit data is provided for slave's TIMESTAMP columns). + */ + table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; + return error; +} + + +int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error) +{ + int local_error= 0; + table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); + table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); + /* + reseting the extra with + table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY); + fires bug#27077 + todo: explain or fix + */ + if ((local_error= table->file->ha_end_bulk_insert())) + { + table->file->print_error(local_error, MYF(0)); + } + return error? error : local_error; +} + + +int +Write_rows_log_event_old::do_prepare_row(THD *thd_arg, + Relay_log_info const *rli, + TABLE *table, + uchar const *row_start, + uchar const **row_end) +{ + DBUG_ASSERT(table != NULL); + DBUG_ASSERT(row_start && row_end); + + int error; + error= unpack_row_old(const_cast<Relay_log_info*>(rli), + table, m_width, table->record[0], + row_start, &m_cols, row_end, &m_master_reclength, + table->write_set, PRE_GA_WRITE_ROWS_EVENT); + bitmap_copy(table->read_set, table->write_set); + return error; +} + + +int Write_rows_log_event_old::do_exec_row(TABLE *table) +{ + DBUG_ASSERT(table != NULL); + int error= replace_record(thd, table, m_master_reclength, m_width); + return error; +} + + +/********************************************************** + Row handling primitives for Delete_rows_log_event_old + **********************************************************/ + +int Delete_rows_log_event_old::do_before_row_operations(TABLE *table) +{ + DBUG_ASSERT(m_memory == NULL); + + if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) && + table->s->primary_key < MAX_KEY) + { + /* + We don't need to allocate any memory for m_after_image and + m_key since they are not used. + */ + return 0; + } + + int error= 0; + + if (table->s->keys > 0) + { + m_memory= (uchar*) my_multi_malloc(MYF(MY_WME), + &m_after_image, + (uint) table->s->reclength, + &m_key, + (uint) table->key_info->key_length, + NullS); + } + else + { + m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME)); + m_memory= (uchar*)m_after_image; + m_key= NULL; + } + if (!m_memory) + return HA_ERR_OUT_OF_MEM; + + return error; +} + + +int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error) +{ + /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/ + table->file->ha_index_or_rnd_end(); + my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc + m_memory= NULL; + m_after_image= NULL; + m_key= NULL; + + return error; +} + + +int +Delete_rows_log_event_old::do_prepare_row(THD *thd_arg, + Relay_log_info const *rli, + TABLE *table, + uchar const *row_start, + uchar const **row_end) +{ + int error; + DBUG_ASSERT(row_start && row_end); + /* + This assertion actually checks that there is at least as many + columns on the slave as on the master. + */ + DBUG_ASSERT(table->s->fields >= m_width); + + error= unpack_row_old(const_cast<Relay_log_info*>(rli), + table, m_width, table->record[0], + row_start, &m_cols, row_end, &m_master_reclength, + table->read_set, PRE_GA_DELETE_ROWS_EVENT); + /* + If we will access rows using the random access method, m_key will + be set to NULL, so we do not need to make a key copy in that case. + */ + if (m_key) + { + KEY *const key_info= table->key_info; + + key_copy(m_key, table->record[0], key_info, 0); + } + + return error; +} + + +int Delete_rows_log_event_old::do_exec_row(TABLE *table) +{ + int error; + DBUG_ASSERT(table != NULL); + + if (!(error= ::find_and_fetch_row(table, m_key))) + { + /* + Now we should have the right row to delete. We are using + record[0] since it is guaranteed to point to a record with the + correct value. + */ + error= table->file->ha_delete_row(table->record[0]); + } + return error; +} + + +/********************************************************** + Row handling primitives for Update_rows_log_event_old + **********************************************************/ + +int Update_rows_log_event_old::do_before_row_operations(TABLE *table) +{ + DBUG_ASSERT(m_memory == NULL); + + int error= 0; + + if (table->s->keys > 0) + { + m_memory= (uchar*) my_multi_malloc(MYF(MY_WME), + &m_after_image, + (uint) table->s->reclength, + &m_key, + (uint) table->key_info->key_length, + NullS); + } + else + { + m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME)); + m_memory= m_after_image; + m_key= NULL; + } + if (!m_memory) + return HA_ERR_OUT_OF_MEM; + + table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; + + return error; +} + + +int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error) +{ + /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/ + table->file->ha_index_or_rnd_end(); + my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); + m_memory= NULL; + m_after_image= NULL; + m_key= NULL; + + return error; +} + + +int Update_rows_log_event_old::do_prepare_row(THD *thd_arg, + Relay_log_info const *rli, + TABLE *table, + uchar const *row_start, + uchar const **row_end) +{ + int error; + DBUG_ASSERT(row_start && row_end); + /* + This assertion actually checks that there is at least as many + columns on the slave as on the master. + */ + DBUG_ASSERT(table->s->fields >= m_width); + + /* record[0] is the before image for the update */ + error= unpack_row_old(const_cast<Relay_log_info*>(rli), + table, m_width, table->record[0], + row_start, &m_cols, row_end, &m_master_reclength, + table->read_set, PRE_GA_UPDATE_ROWS_EVENT); + row_start = *row_end; + /* m_after_image is the after image for the update */ + error= unpack_row_old(const_cast<Relay_log_info*>(rli), + table, m_width, m_after_image, + row_start, &m_cols, row_end, &m_master_reclength, + table->write_set, PRE_GA_UPDATE_ROWS_EVENT); + + DBUG_DUMP("record[0]", table->record[0], table->s->reclength); + DBUG_DUMP("m_after_image", m_after_image, table->s->reclength); + + /* + If we will access rows using the random access method, m_key will + be set to NULL, so we do not need to make a key copy in that case. + */ + if (m_key) + { + KEY *const key_info= table->key_info; + + key_copy(m_key, table->record[0], key_info, 0); + } + + return error; +} + + +int Update_rows_log_event_old::do_exec_row(TABLE *table) +{ + DBUG_ASSERT(table != NULL); + + int error= ::find_and_fetch_row(table, m_key); + if (error) + return error; + + /* + We have to ensure that the new record (i.e., the after image) is + in record[0] and the old record (i.e., the before image) is in + record[1]. This since some storage engines require this (for + example, the partition engine). + + Since find_and_fetch_row() puts the fetched record (i.e., the old + record) in record[1], we can keep it there. We put the new record + (i.e., the after image) into record[0], and copy the fields that + are on the slave (i.e., in record[1]) into record[0], effectively + overwriting the default values that where put there by the + unpack_row() function. + */ + bmove_align(table->record[0], m_after_image, table->s->reclength); + copy_extra_record_fields(table, m_master_reclength, m_width); + + /* + Now we have the right row to update. The old row (the one we're + looking for) is in record[1] and the new row has is in record[0]. + We also have copied the original values already in the slave's + database into the after image delivered from the master. + */ + error= table->file->ha_update_row(table->record[1], table->record[0]); + if (error == HA_ERR_RECORD_IS_THE_SAME) + error= 0; + + return error; +} + +#endif + + +/************************************************************************** + Rows_log_event member functions +**************************************************************************/ + +#ifndef MYSQL_CLIENT +Old_rows_log_event::Old_rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, + MY_BITMAP const *cols, + bool is_transactional) + : Log_event(thd_arg, 0, is_transactional), + m_row_count(0), + m_table(tbl_arg), + m_table_id(tid), + m_width(tbl_arg ? tbl_arg->s->fields : 1), + m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0) +#ifdef HAVE_REPLICATION + , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL) +#endif +{ + + // This constructor should not be reached. + assert(0); + + /* + We allow a special form of dummy event when the table, and cols + are null and the table id is ~0UL. This is a temporary + solution, to be able to terminate a started statement in the + binary log: the extraneous events will be removed in the future. + */ + DBUG_ASSERT(tbl_arg && tbl_arg->s && tid != ~0UL || + !tbl_arg && !cols && tid == ~0UL); + + if (thd_arg->options & OPTION_NO_FOREIGN_KEY_CHECKS) + set_flags(NO_FOREIGN_KEY_CHECKS_F); + if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS) + set_flags(RELAXED_UNIQUE_CHECKS_F); + /* if bitmap_init fails, caught in is_valid() */ + if (likely(!bitmap_init(&m_cols, + m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL, + m_width, + false))) + { + /* Cols can be zero if this is a dummy binrows event */ + if (likely(cols != NULL)) + { + memcpy(m_cols.bitmap, cols->bitmap, no_bytes_in_map(cols)); + create_last_word_mask(&m_cols); + } + } + else + { + // Needed because bitmap_init() does not set it to null on failure + m_cols.bitmap= 0; + } +} +#endif + + +Old_rows_log_event::Old_rows_log_event(const char *buf, uint event_len, + Log_event_type event_type, + const Format_description_log_event + *description_event) + : Log_event(buf, description_event), + m_row_count(0), +#ifndef MYSQL_CLIENT + m_table(NULL), +#endif + m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0) +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL) +#endif +{ + DBUG_ENTER("Old_rows_log_event::Old_Rows_log_event(const char*,...)"); + uint8 const common_header_len= description_event->common_header_len; + uint8 const post_header_len= description_event->post_header_len[event_type-1]; + + DBUG_PRINT("enter",("event_len: %u common_header_len: %d " + "post_header_len: %d", + event_len, common_header_len, + post_header_len)); + + const char *post_start= buf + common_header_len; + DBUG_DUMP("post_header", (uchar*) post_start, post_header_len); + post_start+= RW_MAPID_OFFSET; + if (post_header_len == 6) + { + /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */ + m_table_id= uint4korr(post_start); + post_start+= 4; + } + else + { + m_table_id= (ulong) uint6korr(post_start); + post_start+= RW_FLAGS_OFFSET; + } + + m_flags= uint2korr(post_start); + + uchar const *const var_start= + (const uchar *)buf + common_header_len + post_header_len; + uchar const *const ptr_width= var_start; + uchar *ptr_after_width= (uchar*) ptr_width; + DBUG_PRINT("debug", ("Reading from %p", ptr_after_width)); + m_width = net_field_length(&ptr_after_width); + DBUG_PRINT("debug", ("m_width=%lu", m_width)); + /* if bitmap_init fails, catched in is_valid() */ + if (likely(!bitmap_init(&m_cols, + m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL, + m_width, + false))) + { + DBUG_PRINT("debug", ("Reading from %p", ptr_after_width)); + memcpy(m_cols.bitmap, ptr_after_width, (m_width + 7) / 8); + create_last_word_mask(&m_cols); + ptr_after_width+= (m_width + 7) / 8; + DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols)); + } + else + { + // Needed because bitmap_init() does not set it to null on failure + m_cols.bitmap= NULL; + DBUG_VOID_RETURN; + } + + const uchar* const ptr_rows_data= (const uchar*) ptr_after_width; + size_t const data_size= event_len - (ptr_rows_data - (const uchar *) buf); + DBUG_PRINT("info",("m_table_id: %lu m_flags: %d m_width: %lu data_size: %lu", + m_table_id, m_flags, m_width, (ulong) data_size)); + DBUG_DUMP("rows_data", (uchar*) ptr_rows_data, data_size); + + m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME)); + if (likely((bool)m_rows_buf)) + { +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + m_curr_row= m_rows_buf; +#endif + m_rows_end= m_rows_buf + data_size; + m_rows_cur= m_rows_end; + memcpy(m_rows_buf, ptr_rows_data, data_size); + } + else + m_cols.bitmap= 0; // to not free it + + DBUG_VOID_RETURN; +} + + +Old_rows_log_event::~Old_rows_log_event() +{ + if (m_cols.bitmap == m_bitbuf) // no my_malloc happened + m_cols.bitmap= 0; // so no my_free in bitmap_free + bitmap_free(&m_cols); // To pair with bitmap_init(). + my_free((uchar*)m_rows_buf, MYF(MY_ALLOW_ZERO_PTR)); +} + + +int Old_rows_log_event::get_data_size() +{ + uchar buf[sizeof(m_width)+1]; + uchar *end= net_store_length(buf, (m_width + 7) / 8); + + DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", + return 6 + no_bytes_in_map(&m_cols) + (end - buf) + + (m_rows_cur - m_rows_buf);); + int data_size= ROWS_HEADER_LEN; + data_size+= no_bytes_in_map(&m_cols); + data_size+= (uint) (end - buf); + + data_size+= (uint) (m_rows_cur - m_rows_buf); + return data_size; +} + + +#ifndef MYSQL_CLIENT +int Old_rows_log_event::do_add_row_data(uchar *row_data, size_t length) +{ + /* + When the table has a primary key, we would probably want, by default, to + log only the primary key value instead of the entire "before image". This + would save binlog space. TODO + */ + DBUG_ENTER("Old_rows_log_event::do_add_row_data"); + DBUG_PRINT("enter", ("row_data: 0x%lx length: %lu", (ulong) row_data, + (ulong) length)); + /* + Don't print debug messages when running valgrind since they can + trigger false warnings. + */ +#ifndef HAVE_purify + DBUG_DUMP("row_data", row_data, min(length, 32)); +#endif + + DBUG_ASSERT(m_rows_buf <= m_rows_cur); + DBUG_ASSERT(!m_rows_buf || m_rows_end && m_rows_buf < m_rows_end); + DBUG_ASSERT(m_rows_cur <= m_rows_end); + + /* The cast will always work since m_rows_cur <= m_rows_end */ + if (static_cast<size_t>(m_rows_end - m_rows_cur) <= length) + { + size_t const block_size= 1024; + my_ptrdiff_t const cur_size= m_rows_cur - m_rows_buf; + my_ptrdiff_t const new_alloc= + block_size * ((cur_size + length + block_size - 1) / block_size); + + uchar* const new_buf= (uchar*)my_realloc((uchar*)m_rows_buf, (uint) new_alloc, + MYF(MY_ALLOW_ZERO_PTR|MY_WME)); + if (unlikely(!new_buf)) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + /* If the memory moved, we need to move the pointers */ + if (new_buf != m_rows_buf) + { + m_rows_buf= new_buf; + m_rows_cur= m_rows_buf + cur_size; + } + + /* + The end pointer should always be changed to point to the end of + the allocated memory. + */ + m_rows_end= m_rows_buf + new_alloc; + } + + DBUG_ASSERT(m_rows_cur + length <= m_rows_end); + memcpy(m_rows_cur, row_data, length); + m_rows_cur+= length; + m_row_count++; + DBUG_RETURN(0); +} +#endif + + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +int Old_rows_log_event::do_apply_event(Relay_log_info const *rli) +{ + DBUG_ENTER("Old_rows_log_event::do_apply_event(Relay_log_info*)"); + int error= 0; + + /* + If m_table_id == ~0UL, then we have a dummy event that does not + contain any data. In that case, we just remove all tables in the + tables_to_lock list, close the thread tables, and return with + success. + */ + if (m_table_id == ~0UL) + { + /* + This one is supposed to be set: just an extra check so that + nothing strange has happened. + */ + DBUG_ASSERT(get_flags(STMT_END_F)); + + const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + close_thread_tables(thd); + thd->clear_error(); + DBUG_RETURN(0); + } + + /* + 'thd' has been set by exec_relay_log_event(), just before calling + do_apply_event(). We still check here to prevent future coding + errors. + */ + DBUG_ASSERT(rli->sql_thd == thd); + + /* + If there is no locks taken, this is the first binrow event seen + after the table map events. We should then lock all the tables + used in the transaction and proceed with execution of the actual + event. + */ + if (!thd->lock) + { + bool need_reopen= 1; /* To execute the first lap of the loop below */ + + /* + lock_tables() reads the contents of thd->lex, so they must be + initialized. Contrary to in + Table_map_log_event::do_apply_event() we don't call + mysql_init_query() as that may reset the binlog format. + */ + lex_start(thd); + + while ((error= lock_tables(thd, rli->tables_to_lock, + rli->tables_to_lock_count, &need_reopen))) + { + if (!need_reopen) + { + if (thd->is_slave_error || thd->is_fatal_error) + { + /* + Error reporting borrowed from Query_log_event with many excessive + simplifications (we don't honour --slave-skip-errors) + */ + uint actual_error= thd->net.last_errno; + rli->report(ERROR_LEVEL, actual_error, + "Error '%s' in %s event: when locking tables", + (actual_error ? thd->net.last_error : + "unexpected success or fatal error"), + get_type_str()); + thd->is_fatal_error= 1; + } + else + { + rli->report(ERROR_LEVEL, error, + "Error in %s event: when locking tables", + get_type_str()); + } + const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + DBUG_RETURN(error); + } + + /* + So we need to reopen the tables. + + We need to flush the pending RBR event, since it keeps a + pointer to an open table. + + ALTERNATIVE SOLUTION (not implemented): Extract a pointer to + the pending RBR event and reset the table pointer after the + tables has been reopened. + + NOTE: For this new scheme there should be no pending event: + need to add code to assert that is the case. + */ + thd->binlog_flush_pending_rows_event(false); + TABLE_LIST *tables= rli->tables_to_lock; + close_tables_for_reopen(thd, &tables); + + uint tables_count= rli->tables_to_lock_count; + if ((error= open_tables(thd, &tables, &tables_count, 0))) + { + if (thd->is_slave_error || thd->is_fatal_error) + { + /* + Error reporting borrowed from Query_log_event with many excessive + simplifications (we don't honour --slave-skip-errors) + */ + uint actual_error= thd->net.last_errno; + rli->report(ERROR_LEVEL, actual_error, + "Error '%s' on reopening tables", + (actual_error ? thd->net.last_error : + "unexpected success or fatal error")); + thd->is_slave_error= 1; + } + const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + DBUG_RETURN(error); + } + } + + /* + When the open and locking succeeded, we check all tables to + ensure that they still have the correct type. + + We can use a down cast here since we know that every table added + to the tables_to_lock is a RPL_TABLE_LIST. + */ + + { + RPL_TABLE_LIST *ptr= rli->tables_to_lock; + for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global)) + { + if (ptr->m_tabledef.compatible_with(rli, ptr->table)) + { + mysql_unlock_tables(thd, thd->lock); + thd->lock= 0; + thd->is_slave_error= 1; + const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + DBUG_RETURN(ERR_BAD_TABLE_DEF); + } + } + } + + /* + ... and then we add all the tables to the table map and remove + them from tables to lock. + + We also invalidate the query cache for all the tables, since + they will now be changed. + + TODO [/Matz]: Maybe the query cache should not be invalidated + here? It might be that a table is not changed, even though it + was locked for the statement. We do know that each + Old_rows_log_event contain at least one row, so after processing one + Old_rows_log_event, we can invalidate the query cache for the + associated table. + */ + for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global) + { + const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table); + } +#ifdef HAVE_QUERY_CACHE + query_cache.invalidate_locked_for_write(rli->tables_to_lock); +#endif + } + + TABLE* + table= + m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id); + + if (table) + { + /* + table == NULL means that this table should not be replicated + (this was set up by Table_map_log_event::do_apply_event() + which tested replicate-* rules). + */ + + /* + It's not needed to set_time() but + 1) it continues the property that "Time" in SHOW PROCESSLIST shows how + much slave is behind + 2) it will be needed when we allow replication from a table with no + TIMESTAMP column to a table with one. + So we call set_time(), like in SBR. Presently it changes nothing. + */ + thd->set_time((time_t)when); + /* + There are a few flags that are replicated with each row event. + Make sure to set/clear them before executing the main body of + the event. + */ + if (get_flags(NO_FOREIGN_KEY_CHECKS_F)) + thd->options|= OPTION_NO_FOREIGN_KEY_CHECKS; + else + thd->options&= ~OPTION_NO_FOREIGN_KEY_CHECKS; + + if (get_flags(RELAXED_UNIQUE_CHECKS_F)) + thd->options|= OPTION_RELAXED_UNIQUE_CHECKS; + else + thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS; + /* A small test to verify that objects have consistent types */ + DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS)); + + /* + Now we are in a statement and will stay in a statement until we + see a STMT_END_F. + + We set this flag here, before actually applying any rows, in + case the SQL thread is stopped and we need to detect that we're + inside a statement and halting abruptly might cause problems + when restarting. + */ + const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT); + + if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols)) + set_flags(COMPLETE_ROWS_F); + + /* + Set tables write and read sets. + + Read_set contains all slave columns (in case we are going to fetch + a complete record from slave) + + Write_set equals the m_cols bitmap sent from master but it can be + longer if slave has extra columns. + */ + + DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols); + + bitmap_set_all(table->read_set); + bitmap_set_all(table->write_set); + if (!get_flags(COMPLETE_ROWS_F)) + bitmap_intersect(table->write_set,&m_cols); + + // Do event specific preparations + + error= do_before_row_operations(rli); + + // row processing loop + + while (error == 0 && m_curr_row < m_rows_end) + { + /* in_use can have been set to NULL in close_tables_for_reopen */ + THD* old_thd= table->in_use; + if (!table->in_use) + table->in_use= thd; + + error= do_exec_row(rli); + + DBUG_PRINT("info", ("error: %d", error)); + DBUG_ASSERT(error != HA_ERR_RECORD_DELETED); + + table->in_use = old_thd; + switch (error) + { + case 0: + break; + + /* Some recoverable errors */ + case HA_ERR_RECORD_CHANGED: + case HA_ERR_KEY_NOT_FOUND: /* Idempotency support: OK if + tuple does not exist */ + error= 0; + break; + + default: + rli->report(ERROR_LEVEL, thd->net.last_errno, + "Error in %s event: row application failed. %s", + get_type_str(), + thd->net.last_error ? thd->net.last_error : ""); + thd->is_slave_error= 1; + break; + } + + /* + If m_curr_row_end was not set during event execution (e.g., because + of errors) we can't proceed to the next row. If the error is transient + (i.e., error==0 at this point) we must call unpack_current_row() to set + m_curr_row_end. + */ + + DBUG_PRINT("info", ("error: %d", error)); + DBUG_PRINT("info", ("curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu", + (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end)); + + if (!m_curr_row_end && !error) + unpack_current_row(rli); + + // at this moment m_curr_row_end should be set + DBUG_ASSERT(error || m_curr_row_end != NULL); + DBUG_ASSERT(error || m_curr_row < m_curr_row_end); + DBUG_ASSERT(error || m_curr_row_end <= m_rows_end); + + m_curr_row= m_curr_row_end; + + } // row processing loop + + DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event", + const_cast<Relay_log_info*>(rli)->abort_slave= 1;); + error= do_after_row_operations(rli, error); + if (!cache_stmt) + { + DBUG_PRINT("info", ("Marked that we need to keep log")); + thd->options|= OPTION_KEEP_LOG; + } + } // if (table) + + /* + We need to delay this clear until here bacause unpack_current_row() uses + master-side table definitions stored in rli. + */ + if (rli->tables_to_lock && get_flags(STMT_END_F)) + const_cast<Relay_log_info*>(rli)->clear_tables_to_lock(); + + if (error) + { /* error has occured during the transaction */ + rli->report(ERROR_LEVEL, thd->net.last_errno, + "Error in %s event: error during transaction execution " + "on table %s.%s. %s", + get_type_str(), table->s->db.str, + table->s->table_name.str, + thd->net.last_error ? thd->net.last_error : ""); + + /* + If one day we honour --skip-slave-errors in row-based replication, and + the error should be skipped, then we would clear mappings, rollback, + close tables, but the slave SQL thread would not stop and then may + assume the mapping is still available, the tables are still open... + So then we should clear mappings/rollback/close here only if this is a + STMT_END_F. + For now we code, knowing that error is not skippable and so slave SQL + thread is certainly going to stop. + rollback at the caller along with sbr. + */ + thd->reset_current_stmt_binlog_row_based(); + const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error); + thd->is_slave_error= 1; + DBUG_RETURN(error); + } + + /* + This code would ideally be placed in do_update_pos() instead, but + since we have no access to table there, we do the setting of + last_event_start_time here instead. + */ + if (table && (table->s->primary_key == MAX_KEY) && + !cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS) + { + /* + ------------ Temporary fix until WL#2975 is implemented --------- + + This event is not the last one (no STMT_END_F). If we stop now + (in case of terminate_slave_thread()), how will we restart? We + have to restart from Table_map_log_event, but as this table is + not transactional, the rows already inserted will still be + present, and idempotency is not guaranteed (no PK) so we risk + that repeating leads to double insert. So we desperately try to + continue, hope we'll eventually leave this buggy situation (by + executing the final Old_rows_log_event). If we are in a hopeless + wait (reached end of last relay log and nothing gets appended + there), we timeout after one minute, and notify DBA about the + problem. When WL#2975 is implemented, just remove the member + Relay_log_info::last_event_start_time and all its occurrences. + */ + const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0); + } + + DBUG_RETURN(0); +} + + +Log_event::enum_skip_reason +Old_rows_log_event::do_shall_skip(Relay_log_info *rli) +{ + /* + If the slave skip counter is 1 and this event does not end a + statement, then we should not start executing on the next event. + Otherwise, we defer the decision to the normal skipping logic. + */ + if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F)) + return Log_event::EVENT_SKIP_IGNORE; + else + return Log_event::do_shall_skip(rli); +} + +int +Old_rows_log_event::do_update_pos(Relay_log_info *rli) +{ + DBUG_ENTER("Old_rows_log_event::do_update_pos"); + int error= 0; + + DBUG_PRINT("info", ("flags: %s", + get_flags(STMT_END_F) ? "STMT_END_F " : "")); + + if (get_flags(STMT_END_F)) + { + /* + This is the end of a statement or transaction, so close (and + unlock) the tables we opened when processing the + Table_map_log_event starting the statement. + + OBSERVER. This will clear *all* mappings, not only those that + are open for the table. There is not good handle for on-close + actions for tables. + + NOTE. Even if we have no table ('table' == 0) we still need to be + here, so that we increase the group relay log position. If we didn't, we + could have a group relay log position which lags behind "forever" + (assume the last master's transaction is ignored by the slave because of + replicate-ignore rules). + */ + thd->binlog_flush_pending_rows_event(true); + + /* + If this event is not in a transaction, the call below will, if some + transactional storage engines are involved, commit the statement into + them and flush the pending event to binlog. + If this event is in a transaction, the call will do nothing, but a + Xid_log_event will come next which will, if some transactional engines + are involved, commit the transaction and flush the pending event to the + binlog. + */ + error= ha_autocommit_or_rollback(thd, 0); + + /* + Now what if this is not a transactional engine? we still need to + flush the pending event to the binlog; we did it with + thd->binlog_flush_pending_rows_event(). Note that we imitate + what is done for real queries: a call to + ha_autocommit_or_rollback() (sometimes only if involves a + transactional engine), and a call to be sure to have the pending + event flushed. + */ + + thd->reset_current_stmt_binlog_row_based(); + rli->cleanup_context(thd, 0); + if (error == 0) + { + /* + Indicate that a statement is finished. + Step the group log position if we are not in a transaction, + otherwise increase the event log position. + */ + rli->stmt_done(log_pos, when); + + /* + Clear any errors pushed in thd->net.client_last_err* if for + example "no key found" (as this is allowed). This is a safety + measure; apparently those errors (e.g. when executing a + Delete_rows_log_event_old of a non-existing row, like in + rpl_row_mystery22.test, thd->net.last_error = "Can't + find record in 't1'" and last_errno=1032) do not become + visible. We still prefer to wipe them out. + */ + thd->clear_error(); + } + else + rli->report(ERROR_LEVEL, error, + "Error in %s event: commit of row events failed, " + "table `%s`.`%s`", + get_type_str(), m_table->s->db.str, + m_table->s->table_name.str); + } + else + { + rli->inc_event_relay_log_pos(); + } + + DBUG_RETURN(error); +} + +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ + + +#ifndef MYSQL_CLIENT +bool Old_rows_log_event::write_data_header(IO_CACHE *file) +{ + uchar buf[ROWS_HEADER_LEN]; // No need to init the buffer + + // This method should not be reached. + assert(0); + + DBUG_ASSERT(m_table_id != ~0UL); + DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", + { + int4store(buf + 0, m_table_id); + int2store(buf + 4, m_flags); + return (my_b_safe_write(file, buf, 6)); + }); + int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id); + int2store(buf + RW_FLAGS_OFFSET, m_flags); + return (my_b_safe_write(file, buf, ROWS_HEADER_LEN)); +} + + +bool Old_rows_log_event::write_data_body(IO_CACHE*file) +{ + /* + Note that this should be the number of *bits*, not the number of + bytes. + */ + uchar sbuf[sizeof(m_width)]; + my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf; + + // This method should not be reached. + assert(0); + + bool res= false; + uchar *const sbuf_end= net_store_length(sbuf, (size_t) m_width); + DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf)); + + DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf)); + res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf)); + + DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols)); + res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap, + no_bytes_in_map(&m_cols)); + DBUG_DUMP("rows", m_rows_buf, data_size); + res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size); + + return res; + +} +#endif + + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) +void Old_rows_log_event::pack_info(Protocol *protocol) +{ + char buf[256]; + char const *const flagstr= + get_flags(STMT_END_F) ? " flags: STMT_END_F" : ""; + size_t bytes= my_snprintf(buf, sizeof(buf), + "table_id: %lu%s", m_table_id, flagstr); + protocol->store(buf, bytes, &my_charset_bin); +} +#endif + + +#ifdef MYSQL_CLIENT +void Old_rows_log_event::print_helper(FILE *file, + PRINT_EVENT_INFO *print_event_info, + char const *const name) +{ + IO_CACHE *const head= &print_event_info->head_cache; + IO_CACHE *const body= &print_event_info->body_cache; + if (!print_event_info->short_form) + { + bool const last_stmt_event= get_flags(STMT_END_F); + print_header(head, print_event_info, !last_stmt_event); + my_b_printf(head, "\t%s: table id %lu%s\n", + name, m_table_id, + last_stmt_event ? " flags: STMT_END_F" : ""); + print_base64(body, print_event_info, !last_stmt_event); + } + + if (get_flags(STMT_END_F)) + { + copy_event_cache_to_file_and_reinit(head, file); + copy_event_cache_to_file_and_reinit(body, file); + } +} +#endif + + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +/** + Write the current row into event's table. + + The row is located in the row buffer, pointed by @c m_curr_row member. + Number of columns of the row is stored in @c m_width member (it can be + different from the number of columns in the table to which we insert). + Bitmap @c m_cols indicates which columns are present in the row. It is assumed + that event's table is already open and pointed by @c m_table. + + If the same record already exists in the table it can be either overwritten + or an error is reported depending on the value of @c overwrite flag + (error reporting not yet implemented). Note that the matching record can be + different from the row we insert if we use primary keys to identify records in + the table. + + The row to be inserted can contain values only for selected columns. The + missing columns are filled with default values using @c prepare_record() + function. If a matching record is found in the table and @c overwritte is + true, the missing columns are taken from it. + + @param rli Relay log info (needed for row unpacking). + @param overwrite + Shall we overwrite if the row already exists or signal + error (currently ignored). + + @returns Error code on failure, 0 on success. + + This method, if successful, sets @c m_curr_row_end pointer to point at the + next row in the rows buffer. This is done when unpacking the row to be + inserted. + + @note If a matching record is found, it is either updated using + @c ha_update_row() or first deleted and then new record written. +*/ + +int +Old_rows_log_event::write_row(const Relay_log_info *const rli, + const bool overwrite) +{ + DBUG_ENTER("write_row"); + DBUG_ASSERT(m_table != NULL && thd != NULL); + + TABLE *table= m_table; // pointer to event's table + int error; + int keynum; + auto_afree_ptr<char> key(NULL); + + /* fill table->record[0] with default values */ + + if ((error= prepare_record(table, m_width, + TRUE /* check if columns have def. values */))) + DBUG_RETURN(error); + + /* unpack row into table->record[0] */ + error= unpack_current_row(rli); // TODO: how to handle errors? + +#ifndef DBUG_OFF + DBUG_DUMP("record[0]", table->record[0], table->s->reclength); + DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set); + DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set); +#endif + + /* + Try to write record. If a corresponding record already exists in the table, + we try to change it using ha_update_row() if possible. Otherwise we delete + it and repeat the whole process again. + + TODO: Add safety measures against infinite looping. + */ + + while ((error= table->file->ha_write_row(table->record[0]))) + { + if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT) + { + table->file->print_error(error, MYF(0)); /* to check at exec_relay_log_event */ + DBUG_RETURN(error); + } + if ((keynum= table->file->get_dup_key(error)) < 0) + { + DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns %d)",keynum)); + table->file->print_error(error, MYF(0)); + /* + We failed to retrieve the duplicate key + - either because the error was not "duplicate key" error + - or because the information which key is not available + */ + DBUG_RETURN(error); + } + + /* + We need to retrieve the old row into record[1] to be able to + either update or delete the offending record. We either: + + - use rnd_pos() with a row-id (available as dupp_row) to the + offending row, if that is possible (MyISAM and Blackhole), or else + + - use index_read_idx() with the key that is duplicated, to + retrieve the offending row. + */ + if (table->file->ha_table_flags() & HA_DUPLICATE_POS) + { + DBUG_PRINT("info",("Locating offending record using rnd_pos()")); + error= table->file->rnd_pos(table->record[1], table->file->dup_ref); + if (error) + { + DBUG_PRINT("info",("rnd_pos() returns error %d",error)); + if (error == HA_ERR_RECORD_DELETED) + error= HA_ERR_KEY_NOT_FOUND; + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } + } + else + { + DBUG_PRINT("info",("Locating offending record using index_read_idx()")); + + if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) + { + DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE")); + DBUG_RETURN(my_errno); + } + + if (key.get() == NULL) + { + key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length))); + if (key.get() == NULL) + { + DBUG_PRINT("info",("Can't allocate key buffer")); + DBUG_RETURN(ENOMEM); + } + } + + key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum, + 0); + error= table->file->index_read_idx_map(table->record[1], keynum, + (const uchar*)key.get(), + HA_WHOLE_KEY, + HA_READ_KEY_EXACT); + if (error) + { + DBUG_PRINT("info",("index_read_idx() returns error %d", error)); + if (error == HA_ERR_RECORD_DELETED) + error= HA_ERR_KEY_NOT_FOUND; + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } + } + + /* + Now, record[1] should contain the offending row. That + will enable us to update it or, alternatively, delete it (so + that we can insert the new row afterwards). + */ + + /* + If row is incomplete we will use the record found to fill + missing columns. + */ + if (!get_flags(COMPLETE_ROWS_F)) + { + restore_record(table,record[1]); + error= unpack_current_row(rli); + } + +#ifndef DBUG_OFF + DBUG_PRINT("debug",("preparing for update: before and after image")); + DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength); + DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength); +#endif + + /* + REPLACE is defined as either INSERT or DELETE + INSERT. If + possible, we can replace it with an UPDATE, but that will not + work on InnoDB if FOREIGN KEY checks are necessary. + + I (Matz) am not sure of the reason for the last_uniq_key() + check as, but I'm guessing that it's something along the + following lines. + + Suppose that we got the duplicate key to be a key that is not + the last unique key for the table and we perform an update: + then there might be another key for which the unique check will + fail, so we're better off just deleting the row and inserting + the correct row. + */ + if (last_uniq_key(table, keynum) && + !table->file->referenced_by_foreign_key()) + { + DBUG_PRINT("info",("Updating row using ha_update_row()")); + error=table->file->ha_update_row(table->record[1], + table->record[0]); + switch (error) { + + case HA_ERR_RECORD_IS_THE_SAME: + DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from" + " ha_update_row()")); + error= 0; + + case 0: + break; + + default: + DBUG_PRINT("info",("ha_update_row() returns error %d",error)); + table->file->print_error(error, MYF(0)); + } + + DBUG_RETURN(error); + } + else + { + DBUG_PRINT("info",("Deleting offending row and trying to write new one again")); + if ((error= table->file->ha_delete_row(table->record[1]))) + { + DBUG_PRINT("info",("ha_delete_row() returns error %d",error)); + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } + /* Will retry ha_write_row() with the offending row removed. */ + } + } + + DBUG_RETURN(error); +} + + +/** + Locate the current row in event's table. + + The current row is pointed by @c m_curr_row. Member @c m_width tells how many + columns are there in the row (this can be differnet from the number of columns + in the table). It is assumed that event's table is already open and pointed + by @c m_table. + + If a corresponding record is found in the table it is stored in + @c m_table->record[0]. Note that when record is located based on a primary + key, it is possible that the record found differs from the row being located. + + If no key is specified or table does not have keys, a table scan is used to + find the row. In that case the row should be complete and contain values for + all columns. However, it can still be shorter than the table, i.e. the table + can contain extra columns not present in the row. It is also possible that + the table has fewer columns than the row being located. + + @returns Error code on failure, 0 on success. + + @post In case of success @c m_table->record[0] contains the record found. + Also, the internal "cursor" of the table is positioned at the record found. + + @note If the engine allows random access of the records, a combination of + @c position() and @c rnd_pos() will be used. + */ + +int Old_rows_log_event::find_row(const Relay_log_info *rli) +{ + DBUG_ENTER("find_row"); + + DBUG_ASSERT(m_table && m_table->in_use != NULL); + + TABLE *table= m_table; + int error; + + /* unpack row - missing fields get default values */ + + // TODO: shall we check and report errors here? + prepare_record(table, m_width, FALSE /* don't check errors */); + error= unpack_current_row(rli); + +#ifndef DBUG_OFF + DBUG_PRINT("info",("looking for the following record")); + DBUG_DUMP("record[0]", table->record[0], table->s->reclength); +#endif + + if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) && + table->s->primary_key < MAX_KEY) + { + /* + Use a more efficient method to fetch the record given by + table->record[0] if the engine allows it. We first compute a + row reference using the position() member function (it will be + stored in table->file->ref) and the use rnd_pos() to position + the "cursor" (i.e., record[0] in this case) at the correct row. + + TODO: Add a check that the correct record has been fetched by + comparing with the original record. Take into account that the + record on the master and slave can be of different + length. Something along these lines should work: + + ADD>>> store_record(table,record[1]); + int error= table->file->rnd_pos(table->record[0], table->file->ref); + ADD>>> DBUG_ASSERT(memcmp(table->record[1], table->record[0], + table->s->reclength) == 0); + + */ + DBUG_PRINT("info",("locating record using primary key (position)")); + int error= table->file->rnd_pos_by_record(table->record[0]); + if (error) + { + DBUG_PRINT("info",("rnd_pos returns error %d",error)); + if (error == HA_ERR_RECORD_DELETED) + error= HA_ERR_KEY_NOT_FOUND; + table->file->print_error(error, MYF(0)); + } + DBUG_RETURN(error); + } + + // We can't use position() - try other methods. + + /* + We need to retrieve all fields + TODO: Move this out from this function to main loop + */ + table->use_all_columns(); + + /* + Save copy of the record in table->record[1]. It might be needed + later if linear search is used to find exact match. + */ + store_record(table,record[1]); + + if (table->s->keys > 0) + { + DBUG_PRINT("info",("locating record using primary key (index_read)")); + + /* We have a key: search the table using the index */ + if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE))) + { + DBUG_PRINT("info",("ha_index_init returns error %d",error)); + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } + + /* Fill key data for the row */ + + DBUG_ASSERT(m_key); + key_copy(m_key, table->record[0], table->key_info, 0); + + /* + Don't print debug messages when running valgrind since they can + trigger false warnings. + */ +#ifndef HAVE_purify + DBUG_DUMP("key data", m_key, table->key_info->key_length); +#endif + + /* + We need to set the null bytes to ensure that the filler bit are + all set when returning. There are storage engines that just set + the necessary bits on the bytes and don't set the filler bits + correctly. + */ + my_ptrdiff_t const pos= + table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0; + table->record[0][pos]= 0xFF; + + if ((error= table->file->index_read_map(table->record[0], m_key, + HA_WHOLE_KEY, + HA_READ_KEY_EXACT))) + { + DBUG_PRINT("info",("no record matching the key found in the table")); + if (error == HA_ERR_RECORD_DELETED) + error= HA_ERR_KEY_NOT_FOUND; + table->file->print_error(error, MYF(0)); + table->file->ha_index_end(); + DBUG_RETURN(error); + } + + /* + Don't print debug messages when running valgrind since they can + trigger false warnings. + */ +#ifndef HAVE_purify + DBUG_PRINT("info",("found first matching record")); + DBUG_DUMP("record[0]", table->record[0], table->s->reclength); +#endif + /* + Below is a minor "optimization". If the key (i.e., key number + 0) has the HA_NOSAME flag set, we know that we have found the + correct record (since there can be no duplicates); otherwise, we + have to compare the record with the one found to see if it is + the correct one. + + CAVEAT! This behaviour is essential for the replication of, + e.g., the mysql.proc table since the correct record *shall* be + found using the primary key *only*. There shall be no + comparison of non-PK columns to decide if the correct record is + found. I can see no scenario where it would be incorrect to + chose the row to change only using a PK or an UNNI. + */ + if (table->key_info->flags & HA_NOSAME) + { + table->file->ha_index_end(); + DBUG_RETURN(0); + } + + /* + In case key is not unique, we still have to iterate over records found + and find the one which is identical to the row given. A copy of the + record we are looking for is stored in record[1]. + */ + DBUG_PRINT("info",("non-unique index, scanning it to find matching record")); + + while (record_compare(table)) + { + /* + We need to set the null bytes to ensure that the filler bit + are all set when returning. There are storage engines that + just set the necessary bits on the bytes and don't set the + filler bits correctly. + + TODO[record format ndb]: Remove this code once NDB returns the + correct record format. + */ + if (table->s->null_bytes > 0) + { + table->record[0][table->s->null_bytes - 1]|= + 256U - (1U << table->s->last_null_bit_pos); + } + + while ((error= table->file->index_next(table->record[0]))) + { + /* We just skip records that has already been deleted */ + if (error == HA_ERR_RECORD_DELETED) + continue; + DBUG_PRINT("info",("no record matching the given row found")); + table->file->print_error(error, MYF(0)); + table->file->ha_index_end(); + DBUG_RETURN(error); + } + } + + /* + Have to restart the scan to be able to fetch the next row. + */ + table->file->ha_index_end(); + } + else + { + DBUG_PRINT("info",("locating record using table scan (rnd_next)")); + + int restart_count= 0; // Number of times scanning has restarted from top + + /* We don't have a key: search the table using rnd_next() */ + if ((error= table->file->ha_rnd_init(1))) + { + DBUG_PRINT("info",("error initializing table scan" + " (ha_rnd_init returns %d)",error)); + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } + + /* Continue until we find the right record or have made a full loop */ + do + { + restart_rnd_next: + error= table->file->rnd_next(table->record[0]); + + switch (error) { + + case 0: + break; + + case HA_ERR_RECORD_DELETED: + goto restart_rnd_next; + + case HA_ERR_END_OF_FILE: + if (++restart_count < 2) + table->file->ha_rnd_init(1); + break; + + default: + DBUG_PRINT("info", ("Failed to get next record" + " (rnd_next returns %d)",error)); + table->file->print_error(error, MYF(0)); + table->file->ha_rnd_end(); + DBUG_RETURN(error); + } + } + while (restart_count < 2 && record_compare(table)); + + /* + Note: above record_compare will take into accout all record fields + which might be incorrect in case a partial row was given in the event + */ + + /* + Have to restart the scan to be able to fetch the next row. + */ + if (restart_count == 2) + DBUG_PRINT("info", ("Record not found")); + else + DBUG_DUMP("record found", table->record[0], table->s->reclength); + table->file->ha_rnd_end(); + + DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0); + DBUG_RETURN(error); + } + + DBUG_RETURN(0); +} + +#endif + + +/************************************************************************** + Write_rows_log_event member functions +**************************************************************************/ + +/* + Constructor used to build an event for writing to the binary log. + */ +#if !defined(MYSQL_CLIENT) +Write_rows_log_event_old::Write_rows_log_event_old(THD *thd_arg, + TABLE *tbl_arg, + ulong tid_arg, + MY_BITMAP const *cols, + bool is_transactional) + : Old_rows_log_event(thd_arg, tbl_arg, tid_arg, cols, is_transactional) +{ + + // This constructor should not be reached. + assert(0); + +} +#endif + + +/* + Constructor used by slave to read the event from the binary log. + */ +#ifdef HAVE_REPLICATION +Write_rows_log_event_old::Write_rows_log_event_old(const char *buf, + uint event_len, + const Format_description_log_event + *description_event) +: Old_rows_log_event(buf, event_len, PRE_GA_WRITE_ROWS_EVENT, + description_event) +{ +} +#endif + + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +int +Write_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const) +{ + int error= 0; + + /* + We are using REPLACE semantics and not INSERT IGNORE semantics + when writing rows, that is: new rows replace old rows. We need to + inform the storage engine that it should use this behaviour. + */ + + /* Tell the storage engine that we are using REPLACE semantics. */ + thd->lex->duplicates= DUP_REPLACE; + + /* + Pretend we're executing a REPLACE command: this is needed for + InnoDB and NDB Cluster since they are not (properly) checking the + lex->duplicates flag. + */ + thd->lex->sql_command= SQLCOM_REPLACE; + /* + Do not raise the error flag in case of hitting to an unique attribute + */ + m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); + /* + NDB specific: update from ndb master wrapped as Write_rows + */ + /* + so that the event should be applied to replace slave's row + */ + m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); + /* + NDB specific: if update from ndb master wrapped as Write_rows + does not find the row it's assumed idempotent binlog applying + is taking place; don't raise the error. + */ + m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY); + /* + TODO: the cluster team (Tomas?) says that it's better if the engine knows + how many rows are going to be inserted, then it can allocate needed memory + from the start. + */ + m_table->file->ha_start_bulk_insert(0); + /* + We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill + any TIMESTAMP column with data from the row but instead will use + the event's current time. + As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra + columns, we know that all TIMESTAMP columns on slave will receive explicit + data from the row, so TIMESTAMP_NO_AUTO_SET is ok. + When we allow a table without TIMESTAMP to be replicated to a table having + more columns including a TIMESTAMP column, or when we allow a TIMESTAMP + column to be replicated into a BIGINT column and the slave's table has a + TIMESTAMP column, then the slave's TIMESTAMP column will take its value + from set_time() which we called earlier (consistent with SBR). And then in + some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to + analyze if explicit data is provided for slave's TIMESTAMP columns). + */ + m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; + return error; +} + + +int +Write_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const, + int error) +{ + int local_error= 0; + m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); + m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); + /* + reseting the extra with + table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY); + fires bug#27077 + todo: explain or fix + */ + if ((local_error= m_table->file->ha_end_bulk_insert())) + { + m_table->file->print_error(local_error, MYF(0)); + } + return error? error : local_error; +} + + +int +Write_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) +{ + DBUG_ASSERT(m_table != NULL); + int error= write_row(rli, TRUE /* overwrite */); + + if (error && !thd->net.last_errno) + thd->net.last_errno= error; + + return error; +} + +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ + + +#ifdef MYSQL_CLIENT +void Write_rows_log_event_old::print(FILE *file, + PRINT_EVENT_INFO* print_event_info) +{ + Old_rows_log_event::print_helper(file, print_event_info, "Write_rows_old"); +} +#endif + + +/************************************************************************** + Delete_rows_log_event member functions +**************************************************************************/ + +/* + Constructor used to build an event for writing to the binary log. + */ + +#ifndef MYSQL_CLIENT +Delete_rows_log_event_old::Delete_rows_log_event_old(THD *thd_arg, + TABLE *tbl_arg, + ulong tid, + MY_BITMAP const *cols, + bool is_transactional) + : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional), + m_after_image(NULL), m_memory(NULL) +{ + + // This constructor should not be reached. + assert(0); + +} +#endif /* #if !defined(MYSQL_CLIENT) */ + + +/* + Constructor used by slave to read the event from the binary log. + */ +#ifdef HAVE_REPLICATION +Delete_rows_log_event_old::Delete_rows_log_event_old(const char *buf, + uint event_len, + const Format_description_log_event + *description_event) + : Old_rows_log_event(buf, event_len, PRE_GA_DELETE_ROWS_EVENT, + description_event), + m_after_image(NULL), m_memory(NULL) +{ +} +#endif + + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + +int +Delete_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const) +{ + if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) && + m_table->s->primary_key < MAX_KEY) + { + /* + We don't need to allocate any memory for m_key since it is not used. + */ + return 0; + } + + if (m_table->s->keys > 0) + { + // Allocate buffer for key searches + m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME)); + if (!m_key) + return HA_ERR_OUT_OF_MEM; + } + return 0; +} + + +int +Delete_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const, + int error) +{ + /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/ + m_table->file->ha_index_or_rnd_end(); + my_free(m_key, MYF(MY_ALLOW_ZERO_PTR)); + m_key= NULL; + + return error; +} + + +int Delete_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) +{ + int error; + DBUG_ASSERT(m_table != NULL); + + if (!(error= find_row(rli))) + { + /* + Delete the record found, located in record[0] + */ + error= m_table->file->ha_delete_row(m_table->record[0]); + } + return error; +} + +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ + + +#ifdef MYSQL_CLIENT +void Delete_rows_log_event_old::print(FILE *file, + PRINT_EVENT_INFO* print_event_info) +{ + Old_rows_log_event::print_helper(file, print_event_info, "Delete_rows_old"); +} +#endif + + +/************************************************************************** + Update_rows_log_event member functions +**************************************************************************/ + +/* + Constructor used to build an event for writing to the binary log. + */ +#if !defined(MYSQL_CLIENT) +Update_rows_log_event_old::Update_rows_log_event_old(THD *thd_arg, + TABLE *tbl_arg, + ulong tid, + MY_BITMAP const *cols, + bool is_transactional) + : Old_rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional), + m_after_image(NULL), m_memory(NULL) +{ + + // This constructor should not be reached. + assert(0); +} +#endif /* !defined(MYSQL_CLIENT) */ + + +/* + Constructor used by slave to read the event from the binary log. + */ +#ifdef HAVE_REPLICATION +Update_rows_log_event_old::Update_rows_log_event_old(const char *buf, + uint event_len, + const + Format_description_log_event + *description_event) + : Old_rows_log_event(buf, event_len, PRE_GA_UPDATE_ROWS_EVENT, + description_event), + m_after_image(NULL), m_memory(NULL) +{ +} +#endif + + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + +int +Update_rows_log_event_old::do_before_row_operations(const Slave_reporting_capability *const) +{ + if (m_table->s->keys > 0) + { + // Allocate buffer for key searches + m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME)); + if (!m_key) + return HA_ERR_OUT_OF_MEM; + } + + m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; + + return 0; +} + + +int +Update_rows_log_event_old::do_after_row_operations(const Slave_reporting_capability *const, + int error) +{ + /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/ + m_table->file->ha_index_or_rnd_end(); + my_free(m_key, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc + m_key= NULL; + + return error; +} + + +int +Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) +{ + DBUG_ASSERT(m_table != NULL); + + int error= find_row(rli); + if (error) + { + /* + We need to read the second image in the event of error to be + able to skip to the next pair of updates + */ + m_curr_row= m_curr_row_end; + unpack_current_row(rli); + return error; + } + + /* + This is the situation after locating BI: + + ===|=== before image ====|=== after image ===|=== + ^ ^ + m_curr_row m_curr_row_end + + BI found in the table is stored in record[0]. We copy it to record[1] + and unpack AI to record[0]. + */ + + store_record(m_table,record[1]); + + m_curr_row= m_curr_row_end; + error= unpack_current_row(rli); // this also updates m_curr_row_end + + /* + Now we have the right row to update. The old row (the one we're + looking for) is in record[1] and the new row is in record[0]. + */ +#ifndef HAVE_purify + /* + Don't print debug messages when running valgrind since they can + trigger false warnings. + */ + DBUG_PRINT("info",("Updating row in table")); + DBUG_DUMP("old record", m_table->record[1], m_table->s->reclength); + DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength); +#endif + + error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]); + if (error == HA_ERR_RECORD_IS_THE_SAME) + error= 0; + + return error; +} + +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ + + +#ifdef MYSQL_CLIENT +void Update_rows_log_event_old::print(FILE *file, + PRINT_EVENT_INFO* print_event_info) +{ + Old_rows_log_event::print_helper(file, print_event_info, "Update_rows_old"); +} +#endif |