summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc905
1 files changed, 412 insertions, 493 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index fecc35c95b9..2142aa0b54e 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -456,7 +456,11 @@ Log_event::Log_event()
thd(0)
{
server_id= ::server_id;
- when= my_time(0);
+ /*
+ We can't call my_time() here as this would cause a call before
+ my_init() is called
+ */
+ when= 0;
log_pos= 0;
}
#endif /* !MYSQL_CLIENT */
@@ -637,6 +641,7 @@ void Log_event::init_show_field_list(List<Item>* field_list)
bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
{
uchar header[LOG_EVENT_HEADER_LEN];
+ ulong now;
DBUG_ENTER("Log_event::write_header");
/* Store number of bytes that will be written by this event */
@@ -687,6 +692,8 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
log_pos= my_b_safe_tell(file)+data_written;
}
+ now= (ulong) get_time(); // Query start time
+
/*
Header will be of size LOG_EVENT_HEADER_LEN for all events, except for
FORMAT_DESCRIPTION_EVENT and ROTATE_EVENT, where it will be
@@ -694,7 +701,7 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
because we read them before knowing the format).
*/
- int4store(header, (ulong) when); // timestamp
+ int4store(header, now); // timestamp
header[EVENT_TYPE_OFFSET]= get_type_code();
int4store(header+ SERVER_ID_OFFSET, server_id);
int4store(header+ EVENT_LEN_OFFSET, data_written);
@@ -803,10 +810,12 @@ end:
#ifndef MYSQL_CLIENT
Log_event* Log_event::read_log_event(IO_CACHE* file,
pthread_mutex_t* log_lock,
- const Format_description_log_event *description_event)
+ const Format_description_log_event
+ *description_event)
#else
Log_event* Log_event::read_log_event(IO_CACHE* file,
- const Format_description_log_event *description_event)
+ const Format_description_log_event
+ *description_event)
#endif
{
DBUG_ENTER("Log_event::read_log_event");
@@ -1461,7 +1470,7 @@ Query_log_event::Query_log_event()
/*
SYNOPSIS
Query_log_event::Query_log_event()
- thd - thread handle
+ thd_arg - thread handle
query_arg - array of char representing the query
query_length - size of the `query_arg' array
using_trans - there is a modified transactional table
@@ -1477,10 +1486,12 @@ Query_log_event::Query_log_event()
*/
Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
ulong query_length, bool using_trans,
- bool suppress_use, THD::killed_state killed_status_arg)
+ bool suppress_use,
+ THD::killed_state killed_status_arg)
:Log_event(thd_arg,
- (thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0) |
- (suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
+ (thd_arg->thread_specific_used ? LOG_EVENT_THREAD_SPECIFIC_F :
+ 0) |
+ (suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
using_trans),
data_buf(0), query(query_arg), catalog(thd_arg->catalog),
db(thd_arg->db), q_len((uint32) query_length),
@@ -1501,10 +1512,10 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
error_code=
(killed_status_arg == THD::NOT_KILLED) ? thd_arg->net.last_errno :
((thd_arg->system_thread & SYSTEM_THREAD_DELAYED_INSERT) ? 0 :
- thd->killed_errno());
+ thd_arg->killed_errno());
time(&end_time);
- exec_time = (ulong) (end_time - thd->start_time);
+ exec_time = (ulong) (end_time - thd_arg->start_time);
catalog_len = (catalog) ? (uint32) strlen(catalog) : 0;
/* status_vars_len is set just before writing the event */
db_len = (db) ? (uint32) strlen(db) : 0;
@@ -1513,15 +1524,15 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
/*
If we don't use flags2 for anything else than options contained in
- thd->options, it would be more efficient to flags2=thd_arg->options
+ thd_arg->options, it would be more efficient to flags2=thd_arg->options
(OPTIONS_WRITTEN_TO_BINLOG would be used only at reading time).
But it's likely that we don't want to use 32 bits for 3 bits; in the future
we will probably want to reclaim the 29 bits. So we need the &.
*/
flags2= (uint32) (thd_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG);
- DBUG_ASSERT(thd->variables.character_set_client->number < 256*256);
- DBUG_ASSERT(thd->variables.collation_connection->number < 256*256);
- DBUG_ASSERT(thd->variables.collation_server->number < 256*256);
+ DBUG_ASSERT(thd_arg->variables.character_set_client->number < 256*256);
+ DBUG_ASSERT(thd_arg->variables.collation_connection->number < 256*256);
+ DBUG_ASSERT(thd_arg->variables.collation_server->number < 256*256);
int2store(charset, thd_arg->variables.character_set_client->number);
int2store(charset+2, thd_arg->variables.collation_connection->number);
int2store(charset+4, thd_arg->variables.collation_server->number);
@@ -2247,9 +2258,10 @@ Muted_query_log_event::Muted_query_log_event()
**************************************************************************/
#ifndef MYSQL_CLIENT
-Start_log_event_v3::Start_log_event_v3() :Log_event(), binlog_version(BINLOG_VERSION), artificial_event(0)
+Start_log_event_v3::Start_log_event_v3()
+ :Log_event(), created(0), binlog_version(BINLOG_VERSION),
+ artificial_event(0), dont_set_created(0)
{
- created= when;
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
}
#endif
@@ -2319,7 +2331,8 @@ void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
*/
Start_log_event_v3::Start_log_event_v3(const char* buf,
- const Format_description_log_event* description_event)
+ const Format_description_log_event
+ *description_event)
:Log_event(buf, description_event)
{
buf+= description_event->common_header_len;
@@ -2331,6 +2344,7 @@ Start_log_event_v3::Start_log_event_v3(const char* buf,
created= uint4korr(buf+ST_CREATED_OFFSET);
/* We use log_pos to mark if this was an artificial event or not */
artificial_event= (log_pos == 0);
+ dont_set_created= 1;
}
@@ -2344,6 +2358,8 @@ bool Start_log_event_v3::write(IO_CACHE* file)
char buff[START_V3_HEADER_LEN];
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
+ if (!dont_set_created)
+ created= when= get_time();
int4store(buff + ST_CREATED_OFFSET,created);
return (write_header(file, sizeof(buff)) ||
my_b_safe_write(file, (uchar*) buff, sizeof(buff)));
@@ -2374,8 +2390,7 @@ bool Start_log_event_v3::write(IO_CACHE* file)
int Start_log_event_v3::do_apply_event(Relay_log_info const *rli)
{
DBUG_ENTER("Start_log_event_v3::do_apply_event");
- switch (binlog_version)
- {
+ switch (binlog_version) {
case 3:
case 4:
/*
@@ -2446,7 +2461,6 @@ Format_description_log_event::
Format_description_log_event(uint8 binlog_ver, const char* server_ver)
:Start_log_event_v3()
{
- created= when;
binlog_version= binlog_ver;
switch (binlog_ver) {
case 4: /* MySQL 5.0 */
@@ -2598,6 +2612,8 @@ bool Format_description_log_event::write(IO_CACHE* file)
uchar buff[FORMAT_DESCRIPTION_HEADER_LEN];
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
memcpy((char*) buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
+ if (!dont_set_created)
+ created= when= get_time();
int4store(buff + ST_CREATED_OFFSET,created);
buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
memcpy((char*) buff+ST_COMMON_HEADER_LEN_OFFSET+1, (uchar*) post_header_len,
@@ -4904,8 +4920,9 @@ err:
*/
#ifndef MYSQL_CLIENT
-Append_block_log_event::Append_block_log_event(THD* thd_arg, const char* db_arg,
- char* block_arg,
+Append_block_log_event::Append_block_log_event(THD *thd_arg,
+ const char *db_arg,
+ char *block_arg,
uint block_len_arg,
bool using_trans)
:Log_event(thd_arg,0, using_trans), block(block_arg),
@@ -5154,7 +5171,8 @@ int Delete_file_log_event::do_apply_event(Relay_log_info const *rli)
*/
#ifndef MYSQL_CLIENT
-Execute_load_log_event::Execute_load_log_event(THD *thd_arg, const char* db_arg,
+Execute_load_log_event::Execute_load_log_event(THD *thd_arg,
+ const char* db_arg,
bool using_trans)
:Log_event(thd_arg, 0, using_trans), file_id(thd_arg->file_id), db(db_arg)
{
@@ -5355,7 +5373,7 @@ int Begin_load_query_log_event::get_create_or_append() const
#ifndef MYSQL_CLIENT
Execute_load_query_log_event::
-Execute_load_query_log_event(THD* thd_arg, const char* query_arg,
+Execute_load_query_log_event(THD *thd_arg, const char* query_arg,
ulong query_length_arg, uint fn_pos_start_arg,
uint fn_pos_end_arg,
enum_load_dup_handling dup_handling_arg,
@@ -5636,13 +5654,14 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
m_table_id(tid),
m_width(tbl_arg ? tbl_arg->s->fields : 1),
m_rows_buf(0), m_rows_cur(0), m_rows_end(0),
- m_flags(0)
+ m_curr_row(NULL), m_curr_row_end(NULL),
+ m_flags(0), m_key(NULL)
{
/*
We allow a special form of dummy event when the table, and cols
are null and the table id is ~0UL. This is a temporary
solution, to be able to terminate a started statement in the
- binary log: the extreneous events will be removed in the future.
+ binary log: the extraneous events will be removed in the future.
*/
DBUG_ASSERT(tbl_arg && tbl_arg->s && tid != ~0UL ||
!tbl_arg && !cols && tid == ~0UL);
@@ -5651,7 +5670,7 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
set_flags(NO_FOREIGN_KEY_CHECKS_F);
if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
set_flags(RELAXED_UNIQUE_CHECKS_F);
- /* if bitmap_init fails, catched in is_valid() */
+ /* if bitmap_init fails, caught in is_valid() */
if (likely(!bitmap_init(&m_cols,
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
m_width,
@@ -5678,7 +5697,10 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
*description_event)
: Log_event(buf, description_event),
m_row_count(0),
- m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
+ m_table(NULL),
+ m_rows_buf(0), m_rows_cur(0), m_rows_end(0),
+ m_curr_row(NULL), m_curr_row_end(NULL),
+ m_key(NULL)
{
DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)");
uint8 const common_header_len= description_event->common_header_len;
@@ -5737,7 +5759,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
{
DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
- /* if bitmap_init fails, catched in is_valid() */
+ /* if bitmap_init fails, caught in is_valid() */
if (likely(!bitmap_init(&m_cols_ai,
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
m_width,
@@ -5767,6 +5789,7 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
m_rows_buf= (uchar*) my_malloc(data_size, MYF(MY_WME));
if (likely((bool)m_rows_buf))
{
+ m_curr_row= m_rows_buf;
m_rows_end= m_rows_buf + data_size;
m_rows_cur= m_rows_end;
memcpy(m_rows_buf, ptr_rows_data, data_size);
@@ -5871,7 +5894,6 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
{
DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)");
int error= 0;
- uchar const *row_start= m_rows_buf;
/*
If m_table_id == ~0UL, then we have a dummy event that does not
@@ -6031,7 +6053,9 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
#endif
}
- TABLE* table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
+ TABLE*
+ table=
+ m_table= const_cast<RELAY_LOG_INFO*>(rli)->m_table_map.get_table(m_table_id);
if (table)
{
@@ -6076,24 +6100,43 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
inside a statement and halting abruptly might cause problems
when restarting.
*/
- const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
+ const_cast<RELAY_LOG_INFO*>(rli)->set_flag(RELAY_LOG_INFO::IN_STMT);
- error= do_before_row_operations(table);
- while (error == 0 && row_start < m_rows_end)
- {
- uchar const *row_end= NULL;
- if ((error= do_prepare_row(thd, rli, table, row_start, &row_end)))
- break; // We should perform the after-row operation even in
- // the case of error
+ if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
+ set_flags(COMPLETE_ROWS_F);
+
+ /*
+ Set tables write and read sets.
+
+ Read_set contains all slave columns (in case we are going to fetch
+ a complete record from slave)
+
+ Write_set equals the m_cols bitmap sent from master but it can be
+ longer if slave has extra columns.
+ */
+
+ DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols);
+
+ bitmap_set_all(table->read_set);
+ bitmap_set_all(table->write_set);
+ if (!get_flags(COMPLETE_ROWS_F))
+ bitmap_intersect(table->write_set,&m_cols);
+
+ // Do event specific preparations
+
+ error= do_before_row_operations(rli);
- DBUG_ASSERT(row_end != NULL); // cannot happen
- DBUG_ASSERT(row_end <= m_rows_end);
+ // row processing loop
+ while (error == 0 && m_curr_row < m_rows_end)
+ {
/* in_use can have been set to NULL in close_tables_for_reopen */
THD* old_thd= table->in_use;
if (!table->in_use)
table->in_use= thd;
- error= do_exec_row(table);
+
+ error= do_exec_row(rli);
+
table->in_use = old_thd;
switch (error)
{
@@ -6114,21 +6157,38 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
break;
}
- row_start= row_end;
- }
+ /*
+ If m_curr_row_end was not set during event execution (e.g., because
+ of errors) we can't proceed to the next row. If the error is transient
+ (i.e., error==0 at this point) we must call unpack_current_row() to set
+ m_curr_row_end.
+ */
+
+ if (!m_curr_row_end && !error)
+ unpack_current_row(rli);
+
+ // at this moment m_curr_row_end should be set
+ DBUG_ASSERT(error || m_curr_row_end != NULL);
+ DBUG_ASSERT(error || m_curr_row < m_curr_row_end);
+ DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
+
+ m_curr_row= m_curr_row_end;
+
+ } // row processing loop
+
DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event",
const_cast<Relay_log_info*>(rli)->abort_slave= 1;);
- error= do_after_row_operations(table, error);
+ error= do_after_row_operations(rli, error);
if (!cache_stmt)
{
DBUG_PRINT("info", ("Marked that we need to keep log"));
thd->options|= OPTION_KEEP_LOG;
}
- }
+ } // if (table)
/*
- We need to delay this clear until the table def is no longer needed.
- The table def is needed in unpack_row().
+ We need to delay this clear until the table def stored in m_table_def is no
+ longer needed. It is used in unpack_current_row().
*/
if (rli->tables_to_lock && get_flags(STMT_END_F))
const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
@@ -6474,7 +6534,7 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
m_data_size+= m_tbllen + 2; // Include length and terminating \0
m_data_size+= 1 + m_colcnt; // COLCNT and column types
- /* If malloc fails, catched in is_valid() */
+ /* If malloc fails, caught in is_valid() */
if ((m_memory= (uchar*) my_malloc(m_colcnt, MYF(MY_WME))))
{
m_coltype= reinterpret_cast<uchar*>(m_memory);
@@ -6593,7 +6653,7 @@ Table_map_log_event::Table_map_log_event(const char *buf, uint event_len,
(ulong) m_tbllen, (long) (ptr_tbllen-(const uchar*)vpart),
m_colcnt, (long) (ptr_colcnt-(const uchar*)vpart)));
- /* Allocate mem for all fields in one go. If fails, catched in is_valid() */
+ /* Allocate mem for all fields in one go. If fails, caught in is_valid() */
m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
&m_dbnam, (uint) m_dblen + 1,
&m_tblnam, (uint) m_tbllen + 1,
@@ -6919,7 +6979,8 @@ Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len,
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-int Write_rows_log_event::do_before_row_operations(TABLE *table)
+int
+Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
{
int error= 0;
@@ -6941,26 +7002,26 @@ int Write_rows_log_event::do_before_row_operations(TABLE *table)
/*
Do not raise the error flag in case of hitting to an unique attribute
*/
- table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
+ m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
/*
NDB specific: update from ndb master wrapped as Write_rows
*/
/*
so that the event should be applied to replace slave's row
*/
- table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
+ m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
/*
NDB specific: if update from ndb master wrapped as Write_rows
does not find the row it's assumed idempotent binlog applying
is taking place; don't raise the error.
*/
- table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
+ m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
/*
TODO: the cluster team (Tomas?) says that it's better if the engine knows
how many rows are going to be inserted, then it can allocate needed memory
from the start.
*/
- table->file->ha_start_bulk_insert(0);
+ m_table->file->ha_start_bulk_insert(0);
/*
We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
any TIMESTAMP column with data from the row but instead will use
@@ -6976,45 +7037,31 @@ int Write_rows_log_event::do_before_row_operations(TABLE *table)
some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
analyze if explicit data is provided for slave's TIMESTAMP columns).
*/
- table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
+ m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
return error;
}
-int Write_rows_log_event::do_after_row_operations(TABLE *table, int error)
+int
+Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
+ int error)
{
int local_error= 0;
- table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
- table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
+ m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
+ m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
/*
reseting the extra with
table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY);
fires bug#27077
todo: explain or fix
*/
- if ((local_error= table->file->ha_end_bulk_insert()))
+ if ((local_error= m_table->file->ha_end_bulk_insert()))
{
- table->file->print_error(local_error, MYF(0));
+ m_table->file->print_error(local_error, MYF(0));
}
return error? error : local_error;
}
-int Write_rows_log_event::do_prepare_row(THD *thd, Relay_log_info const *rli,
- TABLE *table,
- uchar const *const row_start,
- uchar const **const row_end)
-{
- DBUG_ASSERT(table != NULL);
- DBUG_ASSERT(row_start && row_end);
-
- if (int error= unpack_row(rli, table, m_width, row_start, &m_cols, row_end,
- &m_master_reclength, table->write_set, WRITE_ROWS_EVENT))
- {
- thd->net.last_errno= error;
- return error;
- }
- bitmap_copy(table->read_set, table->write_set);
- return 0;
-}
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
/*
Check if there are more UNIQUE keys after the given key.
@@ -7028,135 +7075,6 @@ last_uniq_key(TABLE *table, uint keyno)
return 1;
}
-/* Anonymous namespace for template functions/classes */
-namespace {
-
- /*
- Smart pointer that will automatically call my_afree (a macro) when
- the pointer goes out of scope. This is used so that I do not have
- to remember to call my_afree() before each return. There is no
- overhead associated with this, since all functions are inline.
-
- I (Matz) would prefer to use the free function as a template
- parameter, but that is not possible when the "function" is a
- macro.
- */
- template <class Obj>
- class auto_afree_ptr
- {
- Obj* m_ptr;
- public:
- auto_afree_ptr(Obj* ptr) : m_ptr(ptr) { }
- ~auto_afree_ptr() { if (m_ptr) my_afree(m_ptr); }
- void assign(Obj* ptr) {
- /* Only to be called if it hasn't been given a value before. */
- DBUG_ASSERT(m_ptr == NULL);
- m_ptr= ptr;
- }
- Obj* get() { return m_ptr; }
- };
-
-}
-
-
-/*
- Copy "extra" columns from record[1] to record[0].
-
- Copy the extra fields that are not present on the master but are
- present on the slave from record[1] to record[0]. This is used
- after fetching a record that are to be updated, either inside
- replace_record() or as part of executing an update_row().
- */
-static int
-copy_extra_record_fields(TABLE *table,
- size_t master_reclength,
- my_ptrdiff_t master_fields)
-{
- DBUG_ENTER("copy_extra_record_fields(table, master_reclen, master_fields)");
- DBUG_PRINT("info", ("Copying to 0x%lx "
- "from field %lu at offset %lu "
- "to field %d at offset %lu",
- (long) table->record[0],
- (ulong) master_fields, (ulong) master_reclength,
- table->s->fields, table->s->reclength));
- /*
- Copying the extra fields of the slave that does not exist on
- master into record[0] (which are basically the default values).
- */
-
- if (table->s->fields < (uint) master_fields)
- DBUG_RETURN(0);
-
- DBUG_ASSERT(master_reclength <= table->s->reclength);
- if (master_reclength < table->s->reclength)
- bmove_align(table->record[0] + master_reclength,
- table->record[1] + master_reclength,
- table->s->reclength - master_reclength);
-
- /*
- Bit columns are special. We iterate over all the remaining
- columns and copy the "extra" bits to the new record. This is
- not a very good solution: it should be refactored on
- opportunity.
-
- REFACTORING SUGGESTION (Matz). Introduce a member function
- similar to move_field_offset() called copy_field_offset() to
- copy field values and implement it for all Field subclasses. Use
- this function to copy data from the found record to the record
- that are going to be inserted.
-
- The copy_field_offset() function need to be a virtual function,
- which in this case will prevent copying an entire range of
- fields efficiently.
- */
- {
- Field **field_ptr= table->field + master_fields;
- for ( ; *field_ptr ; ++field_ptr)
- {
- /*
- Set the null bit according to the values in record[1]
- */
- if ((*field_ptr)->maybe_null() &&
- (*field_ptr)->is_null_in_record(reinterpret_cast<uchar*>(table->record[1])))
- (*field_ptr)->set_null();
- else
- (*field_ptr)->set_notnull();
-
- /*
- Do the extra work for special columns.
- */
- switch ((*field_ptr)->real_type())
- {
- default:
- /* Nothing to do */
- break;
-
- case MYSQL_TYPE_BIT:
- Field_bit *f= static_cast<Field_bit*>(*field_ptr);
- if (f->bit_len > 0)
- {
- my_ptrdiff_t const offset= table->record[1] - table->record[0];
- uchar const bits=
- get_rec_bits(f->bit_ptr + offset, f->bit_ofs, f->bit_len);
- set_rec_bits(bits, f->bit_ptr, f->bit_ofs, f->bit_len);
- }
- break;
- }
- }
- }
- DBUG_RETURN(0); // All OK
-}
-
-#define DBUG_PRINT_BITSET(N,FRM,BS) \
- do { \
- char buf[256]; \
- for (uint i = 0 ; i < (BS)->n_bits ; ++i) \
- buf[i] = bitmap_is_set((BS), i) ? '1' : '0'; \
- buf[(BS)->n_bits] = '\0'; \
- DBUG_PRINT((N), ((FRM), buf)); \
- } while (0)
-
-
/**
Check if an error is a duplicate key error.
@@ -7182,45 +7100,76 @@ is_duplicate_key_error(int errcode)
return false;
}
+/**
+ Write the current row into event's table.
-/*
- Replace the provided record in the database.
+ The row is located in the row buffer, pointed by @c m_curr_row member.
+ Number of columns of the row is stored in @c m_width member (it can be
+ different from the number of columns in the table to which we insert).
+ Bitmap @c m_cols indicates which columns are present in the row. It is assumed
+ that event's table is already open and pointed by @c m_table.
- SYNOPSIS
- replace_record()
- thd Thread context for writing the record.
- table Table to which record should be written.
- master_reclength
- Offset to first column that is not present on the master,
- alternatively the length of the record on the master
- side.
+ If the same record already exists in the table it can be either overwritten
+ or an error is reported depending on the value of @c overwrite flag
+ (error reporting not yet implemented). Note that the matching record can be
+ different from the row we insert if we use primary keys to identify records in
+ the table.
- RETURN VALUE
- Error code on failure, 0 on success.
+ The row to be inserted can contain values only for selected columns. The
+ missing columns are filled with default values using @c prepare_record()
+ function. If a matching record is found in the table and @c overwritte is
+ true, the missing columns are taken from it.
- DESCRIPTION
- Similar to how it is done in mysql_insert(), we first try to do
- a ha_write_row() and of that fails due to duplicated keys (or
- indices), we do an ha_update_row() or a ha_delete_row() instead.
- */
-static int
-replace_record(THD *thd, TABLE *table,
- ulong const master_reclength,
- uint const master_fields)
+ @param rli Relay log info (needed for row unpacking).
+ @param overwrite
+ Shall we overwrite if the row already exists or signal
+ error (currently ignored).
+
+ @returns Error code on failure, 0 on success.
+
+ This method, if successful, sets @c m_curr_row_end pointer to point at the
+ next row in the rows buffer. This is done when unpacking the row to be
+ inserted.
+
+ @note If a matching record is found, it is either updated using
+ @c ha_update_row() or first deleted and then new record written.
+*/
+
+int
+Rows_log_event::write_row(const RELAY_LOG_INFO *const rli,
+ const bool overwrite)
{
- DBUG_ENTER("replace_record");
- DBUG_ASSERT(table != NULL && thd != NULL);
+ DBUG_ENTER("write_row");
+ DBUG_ASSERT(m_table != NULL && thd != NULL);
+ TABLE *table= m_table; // pointer to event's table
int error;
int keynum;
auto_afree_ptr<char> key(NULL);
+ /* fill table->record[0] with default values */
+
+ if ((error= prepare_record(rli, table, m_width,
+ TRUE /* check if columns have def. values */)))
+ DBUG_RETURN(error);
+
+ /* unpack row into table->record[0] */
+ error= unpack_current_row(rli); // TODO: how to handle errors?
+
#ifndef DBUG_OFF
DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
#endif
+ /*
+ Try to write record. If a corresponding record already exists in the table,
+ we try to change it using ha_update_row() if possible. Otherwise we delete
+ it and repeat the whole process again.
+
+ TODO: Add safety measures against infinite looping.
+ */
+
while ((error= table->file->ha_write_row(table->record[0])))
{
if (error == HA_ERR_LOCK_DEADLOCK || error == HA_ERR_LOCK_WAIT_TIMEOUT)
@@ -7230,6 +7179,7 @@ replace_record(THD *thd, TABLE *table,
}
if ((keynum= table->file->get_dup_key(error)) < 0)
{
+ DBUG_PRINT("info",("Can't locate duplicate key (get_dup_key returns %d)",keynum));
table->file->print_error(error, MYF(0));
/*
We failed to retrieve the duplicate key
@@ -7251,17 +7201,22 @@ replace_record(THD *thd, TABLE *table,
*/
if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
{
+ DBUG_PRINT("info",("Locating offending record using rnd_pos()"));
error= table->file->rnd_pos(table->record[1], table->file->dup_ref);
if (error)
{
+ DBUG_PRINT("info",("rnd_pos() returns error %d",error));
table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
}
}
else
{
+ DBUG_PRINT("info",("Locating offending record using index_read_idx()"));
+
if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
{
+ DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
DBUG_RETURN(my_errno);
}
@@ -7269,30 +7224,47 @@ replace_record(THD *thd, TABLE *table,
{
key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
if (key.get() == NULL)
+ {
+ DBUG_PRINT("info",("Can't allocate key buffer"));
DBUG_RETURN(ENOMEM);
+ }
}
- key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum, 0);
- error= table->file->index_read_idx(table->record[1], keynum,
- (const uchar*)key.get(),
- HA_WHOLE_KEY,
- HA_READ_KEY_EXACT);
+ key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
+ 0);
+ error= table->file->index_read_idx_map(table->record[1], keynum,
+ (const uchar*)key.get(),
+ HA_WHOLE_KEY,
+ HA_READ_KEY_EXACT);
if (error)
{
+ DBUG_PRINT("info",("index_read_idx() returns error %d",error));
table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
}
}
/*
- Now, table->record[1] should contain the offending row. That
+ Now, record[1] should contain the offending row. That
will enable us to update it or, alternatively, delete it (so
that we can insert the new row afterwards).
+ */
- First we copy the columns into table->record[0] that are not
- present on the master from table->record[1], if there are any.
+ /*
+ If row is incomplete we will use the record found to fill
+ missing columns.
*/
- copy_extra_record_fields(table, master_reclength, master_fields);
+ if (!get_flags(COMPLETE_ROWS_F))
+ {
+ restore_record(table,record[1]);
+ error= unpack_current_row(rli);
+ }
+
+#ifndef DBUG_OFF
+ DBUG_PRINT("debug",("preparing for update: before and after image"));
+ DBUG_DUMP("record[1] (before)", table->record[1], table->s->reclength);
+ DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);
+#endif
/*
REPLACE is defined as either INSERT or DELETE + INSERT. If
@@ -7312,18 +7284,32 @@ replace_record(THD *thd, TABLE *table,
if (last_uniq_key(table, keynum) &&
!table->file->referenced_by_foreign_key())
{
+ DBUG_PRINT("info",("Updating row using ha_update_row()"));
error=table->file->ha_update_row(table->record[1],
table->record[0]);
- if (error && error != HA_ERR_RECORD_IS_THE_SAME)
- table->file->print_error(error, MYF(0));
- else
+ switch (error) {
+
+ case HA_ERR_RECORD_IS_THE_SAME:
+ DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
+ " ha_update_row()"));
error= 0;
+
+ case 0:
+ break;
+
+ default:
+ DBUG_PRINT("info",("ha_update_row() returns error %d",error));
+ table->file->print_error(error, MYF(0));
+ }
+
DBUG_RETURN(error);
}
else
{
+ DBUG_PRINT("info",("Deleting offending row and trying to write new one again"));
if ((error= table->file->ha_delete_row(table->record[1])))
{
+ DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
}
@@ -7334,12 +7320,20 @@ replace_record(THD *thd, TABLE *table,
DBUG_RETURN(error);
}
-int Write_rows_log_event::do_exec_row(TABLE *table)
+#endif
+
+int
+Write_rows_log_event::do_exec_row(const RELAY_LOG_INFO *const rli)
{
- DBUG_ASSERT(table != NULL);
- int error= replace_record(thd, table, m_master_reclength, m_width);
- return error;
+ DBUG_ASSERT(m_table != NULL);
+ int error= write_row(rli, TRUE /* overwrite */);
+
+ if (error && !thd->net.last_errno)
+ thd->net.last_errno= error;
+
+ return error;
}
+
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
#ifdef MYSQL_CLIENT
@@ -7432,40 +7426,52 @@ record_compare_exit:
return result;
}
+/**
+ Locate the current row in event's table.
-/*
- Find the row given by 'key', if the table has keys, or else use a table scan
- to find (and fetch) the row.
-
- If the engine allows random access of the records, a combination of
- position() and rnd_pos() will be used.
+ The current row is pointed by @c m_curr_row. Member @c m_width tells how many
+ columns are there in the row (this can be differnet from the number of columns
+ in the table). It is assumed that event's table is already open and pointed
+ by @c m_table.
- @param table Pointer to table to search
- @param key Pointer to key to use for search, if table has key
+ If a corresponding record is found in the table it is stored in
+ @c m_table->record[0]. Note that when record is located based on a primary
+ key, it is possible that the record found differs from the row being located.
- @pre <code>table->record[0]</code> shall contain the row to locate
- and <code>key</code> shall contain a key to use for searching, if
- the engine has a key.
+ If no key is specified or table does not have keys, a table scan is used to
+ find the row. In that case the row should be complete and contain values for
+ all columns. However, it can still be shorter than the table, i.e. the table
+ can contain extra columns not present in the row. It is also possible that
+ the table has fewer columns than the row being located.
- @post If the return value is zero, <code>table->record[1]</code>
- will contain the fetched row and the internal "cursor" will refer to
- the row. If the return value is non-zero,
- <code>table->record[1]</code> is undefined. In either case,
- <code>table->record[0]</code> is undefined.
+ @returns Error code on failure, 0 on success.
+
+ @post In case of success @c m_table->record[0] contains the record found.
+ Also, the internal "cursor" of the table is positioned at the record found.
- @return Zero if the row was successfully fetched into
- <code>table->record[1]</code>, error code otherwise.
+ @note If the engine allows random access of the records, a combination of
+ @c position() and @c rnd_pos() will be used.
*/
-static int find_and_fetch_row(TABLE *table, uchar *key)
+int Rows_log_event::find_row(const RELAY_LOG_INFO *rli)
{
- DBUG_ENTER("find_and_fetch_row(TABLE *table, uchar *key, uchar *record)");
- DBUG_PRINT("enter", ("table: 0x%lx, key: 0x%lx record: 0x%lx",
- (long) table, (long) key, (long) table->record[1]));
+ DBUG_ENTER("find_row");
+
+ DBUG_ASSERT(m_table && m_table->in_use != NULL);
+
+ TABLE *table= m_table;
+ int error;
+
+ /* unpack row - missing fields get default values */
- DBUG_ASSERT(table->in_use != NULL);
+ // TODO: shall we check and report errors here?
+ prepare_record(NULL,table,m_width,FALSE /* don't check errors */);
+ error= unpack_current_row(rli);
+#ifndef DBUG_OFF
+ DBUG_PRINT("info",("looking for the following record"));
DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
+#endif
if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
table->s->primary_key < MAX_KEY)
@@ -7488,34 +7494,48 @@ static int find_and_fetch_row(TABLE *table, uchar *key)
table->s->reclength) == 0);
*/
+ DBUG_PRINT("info",("locating record using primary key (position)"));
table->file->position(table->record[0]);
- int error= table->file->rnd_pos(table->record[0], table->file->ref);
- /*
- rnd_pos() returns the record in table->record[0], so we have to
- move it to table->record[1].
- */
- bmove_align(table->record[1], table->record[0], table->s->reclength);
+ error= table->file->rnd_pos(table->record[0], table->file->ref);
+ if (error)
+ {
+ DBUG_PRINT("info",("rnd_pos returns error %d",error));
+ table->file->print_error(error, MYF(0));
+ }
DBUG_RETURN(error);
}
- /* We need to retrieve all fields */
- /* TODO: Move this out from this function to main loop */
+ // We can't use pisition() - try other methods.
+
+ /*
+ We need to retrieve all fields
+ TODO: Move this out from this function to main loop
+ */
table->use_all_columns();
if (table->s->keys > 0)
{
- int error;
+ DBUG_PRINT("info",("locating record using primary key (index_read)"));
+
/* We have a key: search the table using the index */
if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE)))
+ {
+ DBUG_PRINT("info",("ha_index_init returns error %d",error));
+ table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
+ }
- /*
- Don't print debug messages when running valgrind since they can
- trigger false warnings.
- */
+ /* Fill key data for the row */
+
+ DBUG_ASSERT(m_key);
+ key_copy(m_key, table->record[0], table->key_info, 0);
+
+ /*
+ Don't print debug messages when running valgrind since they can
+ trigger false warnings.
+ */
#ifndef HAVE_purify
- DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
- DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
+ DBUG_DUMP("key data", m_key, table->key_info->key_length);
#endif
/*
@@ -7527,9 +7547,11 @@ static int find_and_fetch_row(TABLE *table, uchar *key)
my_ptrdiff_t const pos=
table->s->null_bytes > 0 ? table->s->null_bytes - 1 : 0;
table->record[1][pos]= 0xFF;
- if ((error= table->file->index_read(table->record[1], key, HA_WHOLE_KEY,
- HA_READ_KEY_EXACT)))
+ if ((error= table->file->index_read_map(table->record[1], m_key,
+ HA_WHOLE_KEY,
+ HA_READ_KEY_EXACT)))
{
+ DBUG_PRINT("info",("no record matching the key found in the table"));
table->file->print_error(error, MYF(0));
table->file->ha_index_end();
DBUG_RETURN(error);
@@ -7540,8 +7562,8 @@ static int find_and_fetch_row(TABLE *table, uchar *key)
trigger false warnings.
*/
#ifndef HAVE_purify
- DBUG_DUMP("table->record[0]", table->record[0], table->s->reclength);
- DBUG_DUMP("table->record[1]", table->record[1], table->s->reclength);
+ DBUG_PRINT("info",("found first matching record"));
+ DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
#endif
/*
Below is a minor "optimization". If the key (i.e., key number
@@ -7563,10 +7585,15 @@ static int find_and_fetch_row(TABLE *table, uchar *key)
DBUG_RETURN(0);
}
+ /*
+ In case key is not unique, we still have to iterate over records found
+ and find the one which is identical to the row given. The row is unpacked
+ in record[1] where missing columns are filled with default values.
+ */
+ DBUG_PRINT("info",("non-unique index, scanning it to find matching record"));
+
while (record_compare(table))
{
- int error;
-
/*
We need to set the null bytes to ensure that the filler bit
are all set when returning. There are storage engines that
@@ -7584,9 +7611,10 @@ static int find_and_fetch_row(TABLE *table, uchar *key)
if ((error= table->file->index_next(table->record[1])))
{
- table->file->print_error(error, MYF(0));
+ DBUG_PRINT("info",("no record matching the given row found"));
+ table->file->print_error(error, MYF(0));
table->file->ha_index_end();
- DBUG_RETURN(error);
+ DBUG_RETURN(error);
}
}
@@ -7597,44 +7625,57 @@ static int find_and_fetch_row(TABLE *table, uchar *key)
}
else
{
+ DBUG_PRINT("info",("locating record using table scan (rnd_next)"));
+
int restart_count= 0; // Number of times scanning has restarted from top
- int error;
/* We don't have a key: search the table using rnd_next() */
if ((error= table->file->ha_rnd_init(1)))
- return error;
+ {
+ DBUG_PRINT("info",("error initializing table scan"
+ " (ha_rnd_init returns %d)",error));
+ table->file->print_error(error, MYF(0));
+ DBUG_RETURN(error);
+ }
/* Continue until we find the right record or have made a full loop */
do
{
error= table->file->rnd_next(table->record[1]);
- DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
- DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
-
switch (error) {
+
case 0:
+ DBUG_DUMP("record found", table->record[0], table->s->reclength);
+
case HA_ERR_RECORD_DELETED:
- break;
+ break;
case HA_ERR_END_OF_FILE:
- if (++restart_count < 2)
- table->file->ha_rnd_init(1);
- break;
+ if (++restart_count < 2)
+ table->file->ha_rnd_init(1);
+ break;
default:
- table->file->print_error(error, MYF(0));
- DBUG_PRINT("info", ("Record not found"));
+ DBUG_PRINT("info", ("Failed to get next record"
+ " (rnd_next returns %d)",error));
+ table->file->print_error(error, MYF(0));
table->file->ha_rnd_end();
- DBUG_RETURN(error);
+ DBUG_RETURN(error);
}
}
while (restart_count < 2 && record_compare(table));
+
+ /*
+ Note: above record_compare will take into accout all record fields
+ which might be incorrect in case a partial row was given in the event
+ */
/*
Have to restart the scan to be able to fetch the next row.
*/
- DBUG_PRINT("info", ("Record %sfound", restart_count == 2 ? "not " : ""));
+ if (restart_count == 2)
+ DBUG_PRINT("info", ("Record not found"));
table->file->ha_rnd_end();
DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
@@ -7643,6 +7684,7 @@ static int find_and_fetch_row(TABLE *table, uchar *key)
DBUG_RETURN(0);
}
+
#endif
/*
@@ -7654,9 +7696,6 @@ Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
ulong tid, MY_BITMAP const *cols,
bool is_transactional)
: Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional)
-#ifdef HAVE_REPLICATION
- ,m_memory(NULL), m_key(NULL), m_after_image(NULL)
-#endif
{
}
#endif /* #if !defined(MYSQL_CLIENT) */
@@ -7668,23 +7707,18 @@ Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len,
const Format_description_log_event
*description_event)
-#if defined(MYSQL_CLIENT)
: Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event)
-#else
- : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event),
- m_memory(NULL), m_key(NULL), m_after_image(NULL)
-#endif
{
}
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-int Delete_rows_log_event::do_before_row_operations(TABLE *table)
-{
- DBUG_ASSERT(m_memory == NULL);
- if ((table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
- table->s->primary_key < MAX_KEY)
+int
+Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
+{
+ if ((m_table->file->ha_table_flags() & HA_PRIMARY_KEY_REQUIRED_FOR_POSITION) &&
+ m_table->s->primary_key < MAX_KEY)
{
/*
We don't need to allocate any memory for m_after_image and
@@ -7693,81 +7727,39 @@ int Delete_rows_log_event::do_before_row_operations(TABLE *table)
return 0;
}
- int error= 0;
-
- if (table->s->keys > 0)
+ if (m_table->s->keys > 0)
{
- m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
- &m_after_image,
- (uint) table->s->reclength,
- &m_key,
- (uint) table->key_info->key_length,
- NullS);
+ // Allocate buffer for key searches
+ m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
+ if (!m_key)
+ return HA_ERR_OUT_OF_MEM;
}
- else
- {
- m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
- m_memory= (uchar*)m_after_image;
- m_key= NULL;
- }
- if (!m_memory)
- return HA_ERR_OUT_OF_MEM;
-
- return error;
+ return 0;
}
-int Delete_rows_log_event::do_after_row_operations(TABLE *table, int error)
+int
+Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
+ int error)
{
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
- table->file->ha_index_or_rnd_end();
- my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc
- m_memory= NULL;
- m_after_image= NULL;
+ m_table->file->ha_index_or_rnd_end();
+ my_free(m_key, MYF(MY_ALLOW_ZERO_PTR));
m_key= NULL;
return error;
}
-int Delete_rows_log_event::do_prepare_row(THD *thd, Relay_log_info const *rli,
- TABLE *table,
- uchar const *const row_start,
- uchar const **const row_end)
-{
- DBUG_ASSERT(row_start && row_end);
- if (int error= unpack_row(rli, table, m_width, row_start, &m_cols, row_end,
- &m_master_reclength, table->read_set, DELETE_ROWS_EVENT))
- {
- thd->net.last_errno= error;
- return error;
- }
-
- /*
- If we will access rows using the random access method, m_key will
- be set to NULL, so we do not need to make a key copy in that case.
- */
- if (m_key)
- {
- KEY *const key_info= table->key_info;
-
- key_copy(m_key, table->record[0], key_info, 0);
- }
-
- return 0;
-}
-
-int Delete_rows_log_event::do_exec_row(TABLE *table)
+int Delete_rows_log_event::do_exec_row(const RELAY_LOG_INFO *const rli)
{
int error;
- DBUG_ASSERT(table != NULL);
+ DBUG_ASSERT(m_table != NULL);
- if (!(error= find_and_fetch_row(table, m_key)))
+ if (!(error= find_row(rli)))
{
/*
- Now we should have the right row to delete. We are using
- record[0] since it is guaranteed to point to a record with the
- correct value.
+ Delete the record found, located in record[0]
*/
- error= table->file->ha_delete_row(table->record[0]);
+ error= m_table->file->ha_delete_row(m_table->record[0]);
}
return error;
}
@@ -7797,10 +7789,6 @@ Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
MY_BITMAP const *cols_ai,
bool is_transactional)
: Rows_log_event(thd_arg, tbl_arg, tid, cols_bi, is_transactional)
-#ifdef HAVE_REPLICATION
- , m_memory(NULL), m_key(NULL)
-
-#endif
{
init(cols_ai);
}
@@ -7810,16 +7798,13 @@ Update_rows_log_event::Update_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
MY_BITMAP const *cols,
bool is_transactional)
: Rows_log_event(thd_arg, tbl_arg, tid, cols, is_transactional)
-#ifdef HAVE_REPLICATION
- , m_memory(NULL), m_key(NULL)
-#endif
{
init(cols);
}
void Update_rows_log_event::init(MY_BITMAP const *cols)
{
- /* if bitmap_init fails, catched in is_valid() */
+ /* if bitmap_init fails, caught in is_valid() */
if (likely(!bitmap_init(&m_cols_ai,
m_width <= sizeof(m_bitbuf_ai)*8 ? m_bitbuf_ai : NULL,
m_width,
@@ -7852,151 +7837,87 @@ Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len,
const
Format_description_log_event
*description_event)
-#if defined(MYSQL_CLIENT)
: Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event)
-#else
- : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event),
- m_memory(NULL), m_key(NULL)
-#endif
{
}
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-int Update_rows_log_event::do_before_row_operations(TABLE *table)
-{
- DBUG_ASSERT(m_memory == NULL);
-
- int error= 0;
- if (table->s->keys > 0)
- {
- m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
- &m_after_image,
- (uint) table->s->reclength,
- &m_key,
- (uint) table->key_info->key_length,
- NullS);
- }
- else
+int
+Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
+{
+ if (m_table->s->keys > 0)
{
- m_after_image= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME));
- m_memory= m_after_image;
- m_key= NULL;
+ // Allocate buffer for key searches
+ m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
+ if (!m_key)
+ return HA_ERR_OUT_OF_MEM;
}
- if (!m_memory)
- return HA_ERR_OUT_OF_MEM;
- table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
+ m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
- return error;
+ return 0;
}
-int Update_rows_log_event::do_after_row_operations(TABLE *table, int error)
+int
+Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
+ int error)
{
/*error= ToDo:find out what this should really be, this triggers close_scan in nbd, returning error?*/
- table->file->ha_index_or_rnd_end();
- my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR));
- m_memory= NULL;
- m_after_image= NULL;
+ m_table->file->ha_index_or_rnd_end();
+ my_free(m_key, MYF(MY_ALLOW_ZERO_PTR)); // Free for multi_malloc
m_key= NULL;
return error;
}
-int Update_rows_log_event::do_prepare_row(THD *thd, Relay_log_info const *rli,
- TABLE *table,
- uchar const *const row_start,
- uchar const **const row_end)
+int
+Update_rows_log_event::do_exec_row(const RELAY_LOG_INFO *const rli)
{
- int error;
- DBUG_ASSERT(row_start && row_end);
- /*
- We need to perform some juggling below since unpack_row() always
- unpacks into table->record[0]. For more information, see the
- comments for unpack_row().
- */
-
- /* record[0] is the before image for the update */
- if ((error= unpack_row(rli, table, m_width, row_start, &m_cols, row_end,
- &m_master_reclength, table->read_set, UPDATE_ROWS_EVENT)))
- {
- thd->net.last_errno= error;
- return error;
- }
+ DBUG_ASSERT(m_table != NULL);
- store_record(table, record[1]);
- uchar const *next_start = *row_end;
- /* m_after_image is the after image for the update */
- if ((error= unpack_row(rli, table, m_width, next_start, &m_cols_ai, row_end,
- &m_master_reclength, table->write_set, UPDATE_ROWS_EVENT)))
- {
- thd->net.last_errno= error;
+ int error= find_row(rli);
+ if (error)
return error;
- }
-
- bmove_align(m_after_image, table->record[0], table->s->reclength);
- restore_record(table, record[1]);
/*
- Don't print debug messages when running valgrind since they can
- trigger false warnings.
- */
-#ifndef HAVE_purify
- DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
- DBUG_DUMP("m_after_image", m_after_image, table->s->reclength);
-#endif
-
- /*
- If we will access rows using the random access method, m_key will
- be set to NULL, so we do not need to make a key copy in that case.
- */
- if (m_key)
- {
- KEY *const key_info= table->key_info;
-
- key_copy(m_key, table->record[0], key_info, 0);
- }
+ This is the situation after locating BI:
- return error;
-}
+ ===|=== before image ====|=== after image ===|===
+ ^ ^
+ m_curr_row m_curr_row_end
-int Update_rows_log_event::do_exec_row(TABLE *table)
-{
- DBUG_ASSERT(table != NULL);
+ BI found in the table is stored in record[0]. We copy it to record[1]
+ and unpack AI to record[0].
+ */
- int error= find_and_fetch_row(table, m_key);
- if (error)
- return error;
+ store_record(m_table,record[1]);
- /*
- We have to ensure that the new record (i.e., the after image) is
- in record[0] and the old record (i.e., the before image) is in
- record[1]. This since some storage engines require this (for
- example, the partition engine).
-
- Since find_and_fetch_row() puts the fetched record (i.e., the old
- record) in record[1], we can keep it there. We put the new record
- (i.e., the after image) into record[0], and copy the fields that
- are on the slave (i.e., in record[1]) into record[0], effectively
- overwriting the default values that where put there by the
- unpack_row() function.
- */
- bmove_align(table->record[0], m_after_image, table->s->reclength);
- copy_extra_record_fields(table, m_master_reclength, m_width);
+ m_curr_row= m_curr_row_end;
+ error= unpack_current_row(rli); // this also updates m_curr_row_end
/*
Now we have the right row to update. The old row (the one we're
- looking for) is in record[1] and the new row has is in record[0].
- We also have copied the original values already in the slave's
- database into the after image delivered from the master.
+ looking for) is in record[1] and the new row is in record[0].
*/
- error= table->file->ha_update_row(table->record[1], table->record[0]);
+#ifndef HAVE_purify
+ /*
+ Don't print debug messages when running valgrind since they can
+ trigger false warnings.
+ */
+ DBUG_PRINT("info",("Updating row in table"));
+ DBUG_DUMP("old record", m_table->record[1], m_table->s->reclength);
+ DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
+#endif
+
+ error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
if (error == HA_ERR_RECORD_IS_THE_SAME)
error= 0;
return error;
}
+
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
#ifdef MYSQL_CLIENT
@@ -8115,5 +8036,3 @@ Incident_log_event::write_data_body(IO_CACHE *file)
DBUG_ENTER("Incident_log_event::write_data_body");
DBUG_RETURN(write_str(file, m_message.str, m_message.length));
}
-
-