diff options
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 835 |
1 files changed, 364 insertions, 471 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index f71d1bb3622..9f3b4c4e487 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -5654,13 +5654,14 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, m_table_id(tid), m_width(tbl_arg ? tbl_arg->s->fields : 1), m_rows_buf(0), m_rows_cur(0), m_rows_end(0), - m_flags(0) + m_curr_row(NULL), m_curr_row_end(NULL), + m_flags(0), m_key(NULL) { /* We allow a special form of dummy event when the table, and cols are null and the table id is ~0UL. This is a temporary solution, to be able to terminate a started statement in the - binary log: the extreneous events will be removed in the future. + binary log: the extraneous events will be removed in the future. */ DBUG_ASSERT(tbl_arg && tbl_arg->s && tid != ~0UL || !tbl_arg && !cols && tid == ~0UL); @@ -5669,7 +5670,7 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, set_flags(NO_FOREIGN_KEY_CHECKS_F); if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS) set_flags(RELAXED_UNIQUE_CHECKS_F); - /* if bitmap_init fails, catched in is_valid() */ + /* if bitmap_init fails, caught in is_valid() */ if (likely(!bitmap_init(&m_cols, m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL, m_width, @@ -5696,7 +5697,10 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, *description_event) : Log_event(buf, description_event), m_row_count(0), - m_rows_buf(0), m_rows_cur(0), m_rows_end(0) + m_table(NULL), + m_rows_buf(0), m_rows_cur(0), m_rows_end(0), + m_curr_row(NULL), m_curr_row_end(NULL), + m_key(NULL) { DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)"); uint8 const common_header_len= description_event->common_header_len; @@ -5755,7 +5759,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, { DBUG_PRINT("debug", ("Reading from %p", ptr_after_width)); - /* if bitmap_init fails, catched in is_valid() */ + /* if bitmap_init fails, caught in is_valid() */ if (likely(!bitmap_init(&m_cols_ai, m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL, m_width, @@ -5785,6 +5789,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len, m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME)); if (likely((bool)m_rows_buf)) { + m_curr_row= m_rows_buf; m_rows_end= m_rows_buf + data_size; m_rows_cur= m_rows_end; memcpy(m_rows_buf, ptr_rows_data, data_size); @@ -5889,7 +5894,6 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli) { DBUG_ENTER("Rows_log_event::do_apply_event(st_relay_log_info*)"); int error= 0; - uchar const *row_start= m_rows_buf; /* If m_table_id == ~0UL, then we have a dummy event that does not @@ -6049,7 +6053,9 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli) #endif } - TABLE* table= const_cast<RELAY_LOG_INFO*>(rli)->m_table_map.get_table(m_table_id); + TABLE* + table= + m_table= const_cast<RELAY_LOG_INFO*>(rli)->m_table_map.get_table(m_table_id); if (table) { @@ -6096,22 +6102,41 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli) */ const_cast<RELAY_LOG_INFO*>(rli)->set_flag(RELAY_LOG_INFO::IN_STMT); - error= do_before_row_operations(table); - while (error == 0 && row_start < m_rows_end) - { - uchar const *row_end= NULL; - if ((error= do_prepare_row(thd, rli, table, row_start, &row_end))) - break; // We should perform the after-row operation even in - // the case of error + if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols)) + set_flags(COMPLETE_ROWS_F); - DBUG_ASSERT(row_end != NULL); // cannot happen - DBUG_ASSERT(row_end <= m_rows_end); + /* + Set tables write and read sets. + + Read_set contains all slave columns (in case we are going to fetch + a complete record from slave) + + Write_set equals the m_cols bitmap sent from master but it can be + longer if slave has extra columns. + */ + + DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols); + + bitmap_set_all(table->read_set); + bitmap_set_all(table->write_set); + if (!get_flags(COMPLETE_ROWS_F)) + bitmap_intersect(table->write_set,&m_cols); + // Do event specific preparations + + error= do_before_row_operations(rli); + + // row processing loop + + while (error == 0 && m_curr_row < m_rows_end) + { /* in_use can have been set to NULL in close_tables_for_reopen */ THD* old_thd= table->in_use; if (!table->in_use) table->in_use= thd; - error= do_exec_row(table); + + error= do_exec_row(rli); + table->in_use = old_thd; switch (error) { @@ -6132,21 +6157,38 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli) break; } - row_start= row_end; - } + /* + If m_curr_row_end was not set during event execution (e.g., because + of errors) we can't proceed to the next row. If the error is transient + (i.e., error==0 at this point) we must call unpack_current_row() to set + m_curr_row_end. + */ + + if (!m_curr_row_end && !error) + unpack_current_row(rli); + + // at this moment m_curr_row_end should be set + DBUG_ASSERT(error || m_curr_row_end != NULL); + DBUG_ASSERT(error || m_curr_row < m_curr_row_end); + DBUG_ASSERT(error || m_curr_row_end <= m_rows_end); + + m_curr_row= m_curr_row_end; + + } // row processing loop + DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event", const_cast<RELAY_LOG_INFO*>(rli)->abort_slave= 1;); - error= do_after_row_operations(table, error); + error= do_after_row_operations(rli, error); if (!cache_stmt) { DBUG_PRINT("info", ("Marked that we need to keep log")); thd->options|= OPTION_KEEP_LOG; } - } + } // if (table) /* - We need to delay this clear until the table def is no longer needed. - The table def is needed in unpack_row(). + We need to delay this clear until the table def stored in m_table_def is no + longer needed. It is used in unpack_current_row(). */ if (rli->tables_to_lock && get_flags(STMT_END_F)) const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock(); @@ -6199,7 +6241,7 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli) wait (reached end of last relay log and nothing gets appended there), we timeout after one minute, and notify DBA about the problem. When WL#2975 is implemented, just remove the member - st_relay_log_info::last_event_start_time and all its occurences. + st_relay_log_info::last_event_start_time and all its occurrences. */ const_cast<RELAY_LOG_INFO*>(rli)->last_event_start_time= my_time(0); } @@ -6603,7 +6645,7 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid, else m_data_size+= m_field_metadata_size + 1; - /* If malloc fails, catched in is_valid() */ + /* If malloc fails, caught in is_valid() */ if ((m_memory= (uchar*) my_malloc(m_colcnt, MYF(MY_WME)))) { m_coltype= reinterpret_cast<uchar*>(m_memory); @@ -6708,7 +6750,7 @@ Table_map_log_event::Table_map_log_event(const char *buf, uint event_len, (ulong) m_tbllen, (long) (ptr_tbllen-(const uchar*)vpart), m_colcnt, (long) (ptr_colcnt-(const uchar*)vpart))); - /* Allocate mem for all fields in one go. If fails, catched in is_valid() */ + /* Allocate mem for all fields in one go. If fails, caught in is_valid() */ m_memory= (uchar*) my_multi_malloc(MYF(MY_WME), &m_dbnam, (uint) m_dblen + 1, &m_tblnam, (uint) m_tbllen + 1, @@ -7034,7 +7076,8 @@ Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len, #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -int Write_rows_log_event::do_before_row_operations(TABLE *table) +int +Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const) { int error= 0; @@ -7056,26 +7099,26 @@ int Write_rows_log_event::do_before_row_operations(TABLE *table) /* Do not raise the error flag in case of hitting to an unique attribute */ - table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); + m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); /* NDB specific: update from ndb master wrapped as Write_rows */ /* so that the event should be applied to replace slave's row */ - table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); + m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); /* NDB specific: if update from ndb master wrapped as Write_rows does not find the row it's assumed idempotent binlog applying is taking place; don't raise the error. */ - table->file->extra(HA_EXTRA_IGNORE_NO_KEY); + m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY); /* TODO: the cluster team (Tomas?) says that it's better if the engine knows how many rows are going to be inserted, then it can allocate needed memory from the start. */ - table->file->ha_start_bulk_insert(0); + m_table->file->ha_start_bulk_insert(0); /* We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill any TIMESTAMP column with data from the row but instead will use @@ -7091,47 +7134,31 @@ int Write_rows_log_event::do_before_row_operations(TABLE *table) some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to analyze if explicit data is provided for slave's TIMESTAMP columns). */ - table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; + m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; return error; } -int Write_rows_log_event::do_after_row_operations(TABLE *table, int error) +int +Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const, + int error) { int local_error= 0; - table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); - table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); + m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); + m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); /* reseting the extra with table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY); fires bug#27077 todo: explain or fix */ - if ((local_error= table->file->ha_end_bulk_insert())) + if ((local_error= m_table->file->ha_end_bulk_insert())) { - table->file->print_error(local_error, MYF(0)); + m_table->file->print_error(local_error, MYF(0)); } return error? error : local_error; } - -int Write_rows_log_event::do_prepare_row(THD *thd_arg, - RELAY_LOG_INFO const *rli, - TABLE *table, - uchar const *const row_start, - uchar const **const row_end) -{ - DBUG_ASSERT(table != NULL); - DBUG_ASSERT(row_start && row_end); - - if (int error= unpack_row(rli, table, m_width, row_start, &m_cols, row_end, - &m_master_reclength, table->write_set, WRITE_ROWS_EVENT)) - { - thd_arg->net.last_errno= error; - return error; - } - bitmap_copy(table->read_set, table->write_set); - return 0; -} +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) /* Check if there are more UNIQUE keys after the given key. @@ -7145,135 +7172,6 @@ last_uniq_key(TABLE *table, uint keyno) return 1; } -/* Anonymous namespace for template functions/classes */ -namespace { - - /* - Smart pointer that will automatically call my_afree (a macro) when - the pointer goes out of scope. This is used so that I do not have - to remember to call my_afree() before each return. There is no - overhead associated with this, since all functions are inline. - - I (Matz) would prefer to use the free function as a template - parameter, but that is not possible when the "function" is a - macro. - */ - template <class Obj> - class auto_afree_ptr - { - Obj* m_ptr; - public: - auto_afree_ptr(Obj* ptr) : m_ptr(ptr) { } - ~auto_afree_ptr() { if (m_ptr) my_afree(m_ptr); } - void assign(Obj* ptr) { - /* Only to be called if it hasn't been given a value before. */ - DBUG_ASSERT(m_ptr == NULL); - m_ptr= ptr; - } - Obj* get() { return m_ptr; } - }; - -} - - -/* - Copy "extra" columns from record[1] to record[0]. - - Copy the extra fields that are not present on the master but are - present on the slave from record[1] to record[0]. This is used - after fetching a record that are to be updated, either inside - replace_record() or as part of executing an update_row(). - */ -static int -copy_extra_record_fields(TABLE *table, - size_t master_reclength, - my_ptrdiff_t master_fields) -{ - DBUG_ENTER("copy_extra_record_fields(table, master_reclen, master_fields)"); - DBUG_PRINT("info", ("Copying to 0x%lx " - "from field %lu at offset %lu " - "to field %d at offset %lu", - (long) table->record[0], - (ulong) master_fields, (ulong) master_reclength, - table->s->fields, table->s->reclength)); - /* - Copying the extra fields of the slave that does not exist on - master into record[0] (which are basically the default values). - */ - - if (table->s->fields < (uint) master_fields) - DBUG_RETURN(0); - - DBUG_ASSERT(master_reclength <= table->s->reclength); - if (master_reclength < table->s->reclength) - bmove_align(table->record[0] + master_reclength, - table->record[1] + master_reclength, - table->s->reclength - master_reclength); - - /* - Bit columns are special. We iterate over all the remaining - columns and copy the "extra" bits to the new record. This is - not a very good solution: it should be refactored on - opportunity. - - REFACTORING SUGGESTION (Matz). Introduce a member function - similar to move_field_offset() called copy_field_offset() to - copy field values and implement it for all Field subclasses. Use - this function to copy data from the found record to the record - that are going to be inserted. - - The copy_field_offset() function need to be a virtual function, - which in this case will prevent copying an entire range of - fields efficiently. - */ - { - Field **field_ptr= table->field + master_fields; - for ( ; *field_ptr ; ++field_ptr) - { - /* - Set the null bit according to the values in record[1] - */ - if ((*field_ptr)->maybe_null() && - (*field_ptr)->is_null_in_record(reinterpret_cast<uchar*>(table->record[1]))) - (*field_ptr)->set_null(); - else - (*field_ptr)->set_notnull(); - - /* - Do the extra work for special columns. - */ - switch ((*field_ptr)->real_type()) - { - default: - /* Nothing to do */ - break; - - case MYSQL_TYPE_BIT: - Field_bit *f= static_cast<Field_bit*>(*field_ptr); - if (f->bit_len > 0) - { - my_ptrdiff_t const offset= table->record[1] - table->record[0]; - uchar const bits= - get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len); - set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len); - } - break; - } - } - } - DBUG_RETURN(0); // All OK -} - -#define DBUG_PRINT_BITSET(N,FRM,BS) \ - do { \ - char buf[256]; \ - for (uint i = 0 ; i < (BS)->n_bits ; ++i) \ - buf[i] = bitmap_is_set((BS), i) ? '1' : '0'; \ - buf[(BS)->n_bits] = '\0'; \ - DBUG_PRINT((N), ((FRM), buf)); \ - } while (0) - - /** Check if an error is a duplicate key error. @@ -7299,45 +7197,76 @@ is_duplicate_key_error(int errcode) return false; } +/** + Write the current row into event's table. -/* - Replace the provided record in the database. + The row is located in the row buffer, pointed by @c m_curr_row member. + Number of columns of the row is stored in @c m_width member (it can be + different from the number of columns in the table to which we insert). + Bitmap @c m_cols indicates which columns are present in the row. It is assumed + that event's table is already open and pointed by @c m_table. - SYNOPSIS - replace_record() - thd Thread context for writing the record. - table Table to which record should be written. - master_reclength - Offset to first column that is not present on the master, - alternatively the length of the record on the master - side. + If the same record already exists in the table it can be either overwritten + or an error is reported depending on the value of @c overwrite flag + (error reporting not yet implemented). Note that the matching record can be + different from the row we insert if we use primary keys to identify records in + the table. - RETURN VALUE - Error code on failure, 0 on success. + The row to be inserted can contain values only for selected columns. The + missing columns are filled with default values using @c prepare_record() + function. If a matching record is found in the table and @c overwritte is + true, the missing columns are taken from it. - DESCRIPTION - Similar to how it is done in mysql_insert(), we first try to do - a ha_write_row() and of that fails due to duplicated keys (or - indices), we do an ha_update_row() or a ha_delete_row() instead. - */ -static int -replace_record(THD *thd, TABLE *table, - ulong const master_reclength, - uint const master_fields) + @param rli Relay log info (needed for row unpacking). + @param overwrite + Shall we overwrite if the row already exists or signal + error (currently ignored). + + @returns Error code on failure, 0 on success. + + This method, if successful, sets @c m_curr_row_end pointer to point at the + next row in the rows buffer. This is done when unpacking the row to be + inserted. + + @note If a matching record is found, it is either updated using + @c ha_update_row() or first deleted and then new record written. +*/ + +int +Rows_log_event::write_row(const RELAY_LOG_INFO *const rli, + const bool overwrite) { - DBUG_ENTER("replace_record"); - DBUG_ASSERT(table != NULL && thd != NULL); + DBUG_ENTER("write_row"); + DBUG_ASSERT(m_table != NULL && thd != NULL); + TABLE *table= m_table; // pointer to event's table int error; int keynum; auto_afree_ptr<char> key(NULL); + /* fill table->record[0] with default values */ + + if ((error= prepare_record(rli, table, m_width, + TRUE /* check if columns have def. values */))) + DBUG_RETURN(error); + + /* unpack row into table->record[0] */ + error= unpack_current_row(rli); // TODO: how to handle errors? + #ifndef DBUG_OFF DBUG_DUMP("record[0]", table->record[0], table->s->reclength); DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set); DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set); #endif + /* + Try to write record. If a corresponding record already exists in the table, + we try to change it using ha_update_row() if possible. Otherwise we delete + it and repeat the whole process again. + + TODO: Add safety measures against infinite looping. + */ + while ((error= table->file->ha_write_row(table->record[0]))) { if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT) @@ -7347,6 +7276,7 @@ replace_record(THD *thd, TABLE *table, } if ((keynum= table->file->get_dup_key(error)) < 0) { + DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns %d)",keynum)); table->file->print_error(error, MYF(0)); /* We failed to retrieve the duplicate key @@ -7368,17 +7298,22 @@ replace_record(THD *thd, TABLE *table, */ if (table->file->ha_table_flags() & HA_DUPLICATE_POS) { + DBUG_PRINT("info",("Locating offending record using rnd_pos()")); error= table->file->rnd_pos(table->record[1], table->file->dup_ref); if (error) { + DBUG_PRINT("info",("rnd_pos() returns error %d",error)); table->file->print_error(error, MYF(0)); DBUG_RETURN(error); } } else { + DBUG_PRINT("info",("Locating offending record using index_read_idx()")); + if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) { + DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE")); DBUG_RETURN(my_errno); } @@ -7386,7 +7321,10 @@ replace_record(THD *thd, TABLE *table, { key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length))); if (key.get() == NULL) + { + DBUG_PRINT("info",("Can't allocate key buffer")); DBUG_RETURN(ENOMEM); + } } key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum, @@ -7397,20 +7335,33 @@ replace_record(THD *thd, TABLE *table, HA_READ_KEY_EXACT); if (error) { + DBUG_PRINT("info",("index_read_idx() returns error %d",error)); table->file->print_error(error, MYF(0)); DBUG_RETURN(error); } } /* - Now, table->record[1] should contain the offending row. That + Now, record[1] should contain the offending row. That will enable us to update it or, alternatively, delete it (so that we can insert the new row afterwards). + */ - First we copy the columns into table->record[0] that are not - present on the master from table->record[1], if there are any. + /* + If row is incomplete we will use the record found to fill + missing columns. */ - copy_extra_record_fields(table, master_reclength, master_fields); + if (!get_flags(COMPLETE_ROWS_F)) + { + restore_record(table,record[1]); + error= unpack_current_row(rli); + } + +#ifndef DBUG_OFF + DBUG_PRINT("debug",("preparing for update: before and after image")); + DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength); + DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength); +#endif /* REPLACE is defined as either INSERT or DELETE + INSERT. If @@ -7430,18 +7381,32 @@ replace_record(THD *thd, TABLE *table, if (last_uniq_key(table, keynum) && !table->file->referenced_by_foreign_key()) { + DBUG_PRINT("info",("Updating row using ha_update_row()")); error=table->file->ha_update_row(table->record[1], table->record[0]); - if (error && error != HA_ERR_RECORD_IS_THE_SAME) - table->file->print_error(error, MYF(0)); - else + switch (error) { + + case HA_ERR_RECORD_IS_THE_SAME: + DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from" + " ha_update_row()")); error= 0; + + case 0: + break; + + default: + DBUG_PRINT("info",("ha_update_row() returns error %d",error)); + table->file->print_error(error, MYF(0)); + } + DBUG_RETURN(error); } else { + DBUG_PRINT("info",("Deleting offending row and trying to write new one again")); if ((error= table->file->ha_delete_row(table->record[1]))) { + DBUG_PRINT("info",("ha_delete_row() returns error %d",error)); table->file->print_error(error, MYF(0)); DBUG_RETURN(error); } @@ -7452,12 +7417,20 @@ replace_record(THD *thd, TABLE *table, DBUG_RETURN(error); } -int Write_rows_log_event::do_exec_row(TABLE *table) +#endif + +int +Write_rows_log_event::do_exec_row(const RELAY_LOG_INFO *const rli) { - DBUG_ASSERT(table != NULL); - int error= replace_record(thd, table, m_master_reclength, m_width); - return error; + DBUG_ASSERT(m_table != NULL); + int error= write_row(rli, TRUE /* overwrite */); + + if (error && !thd->net.last_errno) + thd->net.last_errno= error; + + return error; } + #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ #ifdef MYSQL_CLIENT @@ -7550,40 +7523,52 @@ record_compare_exit: return result; } +/** + Locate the current row in event's table. -/* - Find the row given by 'key', if the table has keys, or else use a table scan - to find (and fetch) the row. - - If the engine allows random access of the records, a combination of - position() and rnd_pos() will be used. + The current row is pointed by @c m_curr_row. Member @c m_width tells how many + columns are there in the row (this can be differnet from the number of columns + in the table). It is assumed that event's table is already open and pointed + by @c m_table. - @param table Pointer to table to search - @param key Pointer to key to use for search, if table has key + If a corresponding record is found in the table it is stored in + @c m_table->record[0]. Note that when record is located based on a primary + key, it is possible that the record found differs from the row being located. - @pre <code>table->record[0]</code> shall contain the row to locate - and <code>key</code> shall contain a key to use for searching, if - the engine has a key. + If no key is specified or table does not have keys, a table scan is used to + find the row. In that case the row should be complete and contain values for + all columns. However, it can still be shorter than the table, i.e. the table + can contain extra columns not present in the row. It is also possible that + the table has fewer columns than the row being located. - @post If the return value is zero, <code>table->record[1]</code> - will contain the fetched row and the internal "cursor" will refer to - the row. If the return value is non-zero, - <code>table->record[1]</code> is undefined. In either case, - <code>table->record[0]</code> is undefined. + @returns Error code on failure, 0 on success. + + @post In case of success @c m_table->record[0] contains the record found. + Also, the internal "cursor" of the table is positioned at the record found. - @return Zero if the row was successfully fetched into - <code>table->record[1]</code>, error code otherwise. + @note If the engine allows random access of the records, a combination of + @c position() and @c rnd_pos() will be used. */ -static int find_and_fetch_row(TABLE *table, uchar *key) +int Rows_log_event::find_row(const RELAY_LOG_INFO *rli) { - DBUG_ENTER("find_and_fetch_row(TABLE *table, uchar *key, uchar *record)"); - DBUG_PRINT("enter", ("table: 0x%lx, key: 0x%lx record: 0x%lx", - (long) table, (long) key, (long) table->record[1])); + DBUG_ENTER("find_row"); + + DBUG_ASSERT(m_table && m_table->in_use != NULL); - DBUG_ASSERT(table->in_use != NULL); + TABLE *table= m_table; + int error; + + /* unpack row - missing fields get default values */ + // TODO: shall we check and report errors here? + prepare_record(NULL,table,m_width,FALSE /* don't check errors */); + error= unpack_current_row(rli); + +#ifndef DBUG_OFF + DBUG_PRINT("info",("looking for the following record")); DBUG_DUMP("record[0]", table->record[0], table->s->reclength); +#endif if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) && table->s->primary_key < MAX_KEY) @@ -7606,34 +7591,48 @@ static int find_and_fetch_row(TABLE *table, uchar *key) table->s->reclength) == 0); */ + DBUG_PRINT("info",("locating record using primary key (position)")); table->file->position(table->record[0]); - int error= table->file->rnd_pos(table->record[0], table->file->ref); - /* - rnd_pos() returns the record in table->record[0], so we have to - move it to table->record[1]. - */ - bmove_align(table->record[1], table->record[0], table->s->reclength); + error= table->file->rnd_pos(table->record[0], table->file->ref); + if (error) + { + DBUG_PRINT("info",("rnd_pos returns error %d",error)); + table->file->print_error(error, MYF(0)); + } DBUG_RETURN(error); } - /* We need to retrieve all fields */ - /* TODO: Move this out from this function to main loop */ + // We can't use pisition() - try other methods. + + /* + We need to retrieve all fields + TODO: Move this out from this function to main loop + */ table->use_all_columns(); if (table->s->keys > 0) { - int error; + DBUG_PRINT("info",("locating record using primary key (index_read)")); + /* We have a key: search the table using the index */ if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE))) + { + DBUG_PRINT("info",("ha_index_init returns error %d",error)); + table->file->print_error(error, MYF(0)); DBUG_RETURN(error); + } - /* - Don't print debug messages when running valgrind since they can - trigger false warnings. - */ + /* Fill key data for the row */ + + DBUG_ASSERT(m_key); + key_copy(m_key, table->record[0], table->key_info, 0); + + /* + Don't print debug messages when running valgrind since they can + trigger false warnings. + */ #ifndef HAVE_purify - DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength); - DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength); + DBUG_DUMP("key data", m_key, table->key_info->key_length); #endif /* @@ -7645,9 +7644,11 @@ static int find_and_fetch_row(TABLE *table, uchar *key) my_ptrdiff_t const pos= table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0; table->record[1][pos]= 0xFF; - if ((error= table->file->index_read_map(table->record[1], key, HA_WHOLE_KEY, + if ((error= table->file->index_read_map(table->record[1], m_key, + HA_WHOLE_KEY, HA_READ_KEY_EXACT))) { + DBUG_PRINT("info",("no record matching the key found in the table")); table->file->print_error(error, MYF(0)); table->file->ha_index_end(); DBUG_RETURN(error); @@ -7658,8 +7659,8 @@ static int find_and_fetch_row(TABLE *table, uchar *key) trigger false warnings. */ #ifndef HAVE_purify - DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength); - DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength); + DBUG_PRINT("info",("found first matching record")); + DBUG_DUMP("record[0]", table->record[0], table->s->reclength); #endif /* Below is a minor "optimization". If the key (i.e., key number @@ -7681,10 +7682,15 @@ static int find_and_fetch_row(TABLE *table, uchar *key) DBUG_RETURN(0); } + /* + In case key is not unique, we still have to iterate over records found + and find the one which is identical to the row given. The row is unpacked + in record[1] where missing columns are filled with default values. + */ + DBUG_PRINT("info",("non-unique index, scanning it to find matching record")); + while (record_compare(table)) { - int error; - /* We need to set the null bytes to ensure that the filler bit are all set when returning. There are storage engines that @@ -7702,9 +7708,10 @@ static int find_and_fetch_row(TABLE *table, uchar *key) if ((error= table->file->index_next(table->record[1]))) { - table->file->print_error(error, MYF(0)); + DBUG_PRINT("info",("no record matching the given row found")); + table->file->print_error(error, MYF(0)); table->file->ha_index_end(); - DBUG_RETURN(error); + DBUG_RETURN(error); } } @@ -7715,44 +7722,57 @@ static int find_and_fetch_row(TABLE *table, uchar *key) } else { + DBUG_PRINT("info",("locating record using table scan (rnd_next)")); + int restart_count= 0; // Number of times scanning has restarted from top - int error; /* We don't have a key: search the table using rnd_next() */ if ((error= table->file->ha_rnd_init(1))) - return error; + { + DBUG_PRINT("info",("error initializing table scan" + " (ha_rnd_init returns %d)",error)); + table->file->print_error(error, MYF(0)); + DBUG_RETURN(error); + } /* Continue until we find the right record or have made a full loop */ do { error= table->file->rnd_next(table->record[1]); - DBUG_DUMP("record[0]", table->record[0], table->s->reclength); - DBUG_DUMP("record[1]", table->record[1], table->s->reclength); - switch (error) { + case 0: + DBUG_DUMP("record found", table->record[0], table->s->reclength); + case HA_ERR_RECORD_DELETED: - break; + break; case HA_ERR_END_OF_FILE: - if (++restart_count < 2) - table->file->ha_rnd_init(1); - break; + if (++restart_count < 2) + table->file->ha_rnd_init(1); + break; default: - table->file->print_error(error, MYF(0)); - DBUG_PRINT("info", ("Record not found")); + DBUG_PRINT("info", ("Failed to get next record" + " (rnd_next returns %d)",error)); + table->file->print_error(error, MYF(0)); table->file->ha_rnd_end(); - DBUG_RETURN(error); + DBUG_RETURN(error); } } while (restart_count < 2 && record_compare(table)); + + /* + Note: above record_compare will take into accout all record fields + which might be incorrect in case a partial row was given in the event + */ /* Have to restart the scan to be able to fetch the next row. */ - DBUG_PRINT("info", ("Record %sfound", restart_count == 2 ? "not " : "")); + if (restart_count == 2) + DBUG_PRINT("info", ("Record not found")); table->file->ha_rnd_end(); DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0); @@ -7761,6 +7781,7 @@ static int find_and_fetch_row(TABLE *table, uchar *key) DBUG_RETURN(0); } + #endif /* @@ -7772,9 +7793,6 @@ Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid, MY_BITMAP const *cols, bool is_transactional) : Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional) -#ifdef HAVE_REPLICATION - ,m_memory(NULL), m_key(NULL), m_after_image(NULL) -#endif { } #endif /* #if !defined(MYSQL_CLIENT) */ @@ -7786,23 +7804,18 @@ Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg, Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len, const Format_description_log_event *description_event) -#if defined(MYSQL_CLIENT) : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event) -#else - : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event), - m_memory(NULL), m_key(NULL), m_after_image(NULL) -#endif { } #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -int Delete_rows_log_event::do_before_row_operations(TABLE *table) -{ - DBUG_ASSERT(m_memory == NULL); - if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) && - table->s->primary_key < MAX_KEY) +int +Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const) +{ + if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) && + m_table->s->primary_key < MAX_KEY) { /* We don't need to allocate any memory for m_after_image and @@ -7811,82 +7824,39 @@ int Delete_rows_log_event::do_before_row_operations(TABLE *table) return 0; } - int error= 0; - - if (table->s->keys > 0) - { - m_memory= (uchar*) my_multi_malloc(MYF(MY_WME), - &m_after_image, - (uint) table->s->reclength, - &m_key, - (uint) table->key_info->key_length, - NullS); - } - else + if (m_table->s->keys > 0) { - m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME)); - m_memory= (uchar*)m_after_image; - m_key= NULL; + // Allocate buffer for key searches + m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME)); + if (!m_key) + return HA_ERR_OUT_OF_MEM; } - if (!m_memory) - return HA_ERR_OUT_OF_MEM; - - return error; + return 0; } -int Delete_rows_log_event::do_after_row_operations(TABLE *table, int error) +int +Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const, + int error) { /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/ - table->file->ha_index_or_rnd_end(); - my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc - m_memory= NULL; - m_after_image= NULL; + m_table->file->ha_index_or_rnd_end(); + my_free(m_key, MYF(MY_ALLOW_ZERO_PTR)); m_key= NULL; return error; } -int Delete_rows_log_event::do_prepare_row(THD *thd_arg, - RELAY_LOG_INFO const *rli, - TABLE *table, - uchar const *const row_start, - uchar const **const row_end) -{ - DBUG_ASSERT(row_start && row_end); - if (int error= unpack_row(rli, table, m_width, row_start, &m_cols, row_end, - &m_master_reclength, table->read_set, DELETE_ROWS_EVENT)) - { - thd_arg->net.last_errno= error; - return error; - } - - /* - If we will access rows using the random access method, m_key will - be set to NULL, so we do not need to make a key copy in that case. - */ - if (m_key) - { - KEY *const key_info= table->key_info; - - key_copy(m_key, table->record[0], key_info, 0); - } - - return 0; -} - -int Delete_rows_log_event::do_exec_row(TABLE *table) +int Delete_rows_log_event::do_exec_row(const RELAY_LOG_INFO *const rli) { int error; - DBUG_ASSERT(table != NULL); + DBUG_ASSERT(m_table != NULL); - if (!(error= find_and_fetch_row(table, m_key))) + if (!(error= find_row(rli))) { /* - Now we should have the right row to delete. We are using - record[0] since it is guaranteed to point to a record with the - correct value. + Delete the record found, located in record[0] */ - error= table->file->ha_delete_row(table->record[0]); + error= m_table->file->ha_delete_row(m_table->record[0]); } return error; } @@ -7916,10 +7886,6 @@ Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg, MY_BITMAP const *cols_ai, bool is_transactional) : Rows_log_event(thd_arg, tbl_arg, tid, cols_bi, is_transactional) -#ifdef HAVE_REPLICATION - , m_memory(NULL), m_key(NULL) - -#endif { init(cols_ai); } @@ -7929,16 +7895,13 @@ Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg, MY_BITMAP const *cols, bool is_transactional) : Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional) -#ifdef HAVE_REPLICATION - , m_memory(NULL), m_key(NULL) -#endif { init(cols); } void Update_rows_log_event::init(MY_BITMAP const *cols) { - /* if bitmap_init fails, catched in is_valid() */ + /* if bitmap_init fails, caught in is_valid() */ if (likely(!bitmap_init(&m_cols_ai, m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL, m_width, @@ -7971,157 +7934,87 @@ Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len, const Format_description_log_event *description_event) -#if defined(MYSQL_CLIENT) : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event) -#else - : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event), - m_memory(NULL), m_key(NULL) -#endif { } #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -int Update_rows_log_event::do_before_row_operations(TABLE *table) -{ - DBUG_ASSERT(m_memory == NULL); - - int error= 0; - if (table->s->keys > 0) - { - m_memory= (uchar*) my_multi_malloc(MYF(MY_WME), - &m_after_image, - (uint) table->s->reclength, - &m_key, - (uint) table->key_info->key_length, - NullS); - } - else +int +Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const) +{ + if (m_table->s->keys > 0) { - m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME)); - m_memory= m_after_image; - m_key= NULL; + // Allocate buffer for key searches + m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME)); + if (!m_key) + return HA_ERR_OUT_OF_MEM; } - if (!m_memory) - return HA_ERR_OUT_OF_MEM; - table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; + m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET; - return error; + return 0; } -int Update_rows_log_event::do_after_row_operations(TABLE *table, int error) +int +Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const, + int error) { - /* - error= ToDo:find out what this should really be, this triggers - close_scan in nbd, returning error? - */ - table->file->ha_index_or_rnd_end(); - my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); - m_memory= NULL; - m_after_image= NULL; + /*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/ + m_table->file->ha_index_or_rnd_end(); + my_free(m_key, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc m_key= NULL; return error; } -int Update_rows_log_event::do_prepare_row(THD *thd_arg, - RELAY_LOG_INFO const *rli, - TABLE *table, - uchar const *const row_start, - uchar const **const row_end) +int +Update_rows_log_event::do_exec_row(const RELAY_LOG_INFO *const rli) { - int error; - DBUG_ASSERT(row_start && row_end); - /* - We need to perform some juggling below since unpack_row() always - unpacks into table->record[0]. For more information, see the - comments for unpack_row(). - */ + DBUG_ASSERT(m_table != NULL); - /* record[0] is the before image for the update */ - if ((error= unpack_row(rli, table, m_width, row_start, &m_cols, row_end, - &m_master_reclength, table->read_set, - UPDATE_ROWS_EVENT))) - { - thd_arg->net.last_errno= error; - return error; - } - - store_record(table, record[1]); - uchar const *next_start = *row_end; - /* m_after_image is the after image for the update */ - if ((error= unpack_row(rli, table, m_width, next_start, &m_cols_ai, row_end, - &m_master_reclength, table->write_set, - UPDATE_ROWS_EVENT))) - { - thd_arg->net.last_errno= error; + int error= find_row(rli); + if (error) return error; - } - - bmove_align(m_after_image, table->record[0], table->s->reclength); - restore_record(table, record[1]); - - /* - Don't print debug messages when running valgrind since they can - trigger false warnings. - */ -#ifndef HAVE_purify - DBUG_DUMP("record[0]", table->record[0], table->s->reclength); - DBUG_DUMP("m_after_image", m_after_image, table->s->reclength); -#endif /* - If we will access rows using the random access method, m_key will - be set to NULL, so we do not need to make a key copy in that case. - */ - if (m_key) - { - KEY *const key_info= table->key_info; - - key_copy(m_key, table->record[0], key_info, 0); - } + This is the situation after locating BI: - return error; -} + ===|=== before image ====|=== after image ===|=== + ^ ^ + m_curr_row m_curr_row_end -int Update_rows_log_event::do_exec_row(TABLE *table) -{ - DBUG_ASSERT(table != NULL); + BI found in the table is stored in record[0]. We copy it to record[1] + and unpack AI to record[0]. + */ - int error= find_and_fetch_row(table, m_key); - if (error) - return error; + store_record(m_table,record[1]); - /* - We have to ensure that the new record (i.e., the after image) is - in record[0] and the old record (i.e., the before image) is in - record[1]. This since some storage engines require this (for - example, the partition engine). - - Since find_and_fetch_row() puts the fetched record (i.e., the old - record) in record[1], we can keep it there. We put the new record - (i.e., the after image) into record[0], and copy the fields that - are on the slave (i.e., in record[1]) into record[0], effectively - overwriting the default values that where put there by the - unpack_row() function. - */ - bmove_align(table->record[0], m_after_image, table->s->reclength); - copy_extra_record_fields(table, m_master_reclength, m_width); + m_curr_row= m_curr_row_end; + error= unpack_current_row(rli); // this also updates m_curr_row_end /* Now we have the right row to update. The old row (the one we're - looking for) is in record[1] and the new row has is in record[0]. - We also have copied the original values already in the slave's - database into the after image delivered from the master. + looking for) is in record[1] and the new row is in record[0]. */ - error= table->file->ha_update_row(table->record[1], table->record[0]); +#ifndef HAVE_purify + /* + Don't print debug messages when running valgrind since they can + trigger false warnings. + */ + DBUG_PRINT("info",("Updating row in table")); + DBUG_DUMP("old record", m_table->record[1], m_table->s->reclength); + DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength); +#endif + + error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]); if (error == HA_ERR_RECORD_IS_THE_SAME) error= 0; return error; } + #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ #ifdef MYSQL_CLIENT |