summaryrefslogtreecommitdiff
path: root/sql/log_event_old.cc
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-09-16 14:33:49 +0200
committerunknown <knielsen@knielsen-hq.org>2013-09-16 14:33:49 +0200
commit5633dd822711a269098bdb127c76c4b1250fcf8d (patch)
tree3359d76be8be47f5e21d5c6c7f7d3e60f835d752 /sql/log_event_old.cc
parentd107bdaa01ad34b1afb4542b981b9b19af499f7b (diff)
downloadmariadb-git-5633dd822711a269098bdb127c76c4b1250fcf8d.tar.gz
MDEV-4506: parallel replication.
Add a simple test case. Fix bugs found.
Diffstat (limited to 'sql/log_event_old.cc')
-rw-r--r--sql/log_event_old.cc97
1 files changed, 48 insertions, 49 deletions
diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc
index db1b3fb5a9f..58f299dabe7 100644
--- a/sql/log_event_old.cc
+++ b/sql/log_event_old.cc
@@ -58,7 +58,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
*/
DBUG_ASSERT(ev->get_flags(Old_rows_log_event::STMT_END_F));
- const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd);
+ rgi->slave_close_thread_tables(ev_thd);
ev_thd->clear_error();
DBUG_RETURN(0);
}
@@ -98,7 +98,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
*/
ev_thd->lex->set_stmt_row_injection();
- if (open_and_lock_tables(ev_thd, rli->tables_to_lock, FALSE, 0))
+ if (open_and_lock_tables(ev_thd, rgi->tables_to_lock, FALSE, 0))
{
uint actual_error= ev_thd->stmt_da->sql_errno();
if (ev_thd->is_slave_error || ev_thd->is_fatal_error)
@@ -113,7 +113,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
"unexpected success or fatal error"));
ev_thd->is_slave_error= 1;
}
- const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
+ rgi->slave_close_thread_tables(thd);
DBUG_RETURN(actual_error);
}
@@ -126,8 +126,8 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
*/
{
- RPL_TABLE_LIST *ptr= rli->tables_to_lock;
- for (uint i= 0 ; ptr&& (i< rli->tables_to_lock_count);
+ RPL_TABLE_LIST *ptr= rgi->tables_to_lock;
+ for (uint i= 0 ; ptr&& (i< rgi->tables_to_lock_count);
ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global), i++)
{
DBUG_ASSERT(ptr->m_tabledef_valid);
@@ -136,7 +136,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
ptr->table, &conv_table))
{
ev_thd->is_slave_error= 1;
- const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd);
+ rgi->slave_close_thread_tables(ev_thd);
DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF);
}
DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
@@ -161,15 +161,15 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
Old_rows_log_event, we can invalidate the query cache for the
associated table.
*/
- TABLE_LIST *ptr= rli->tables_to_lock;
- for (uint i=0; ptr && (i < rli->tables_to_lock_count); ptr= ptr->next_global, i++)
- const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
+ TABLE_LIST *ptr= rgi->tables_to_lock;
+ for (uint i=0; ptr && (i < rgi->tables_to_lock_count); ptr= ptr->next_global, i++)
+ rgi->m_table_map.set_table(ptr->table_id, ptr->table);
#ifdef HAVE_QUERY_CACHE
- query_cache.invalidate_locked_for_write(thd, rli->tables_to_lock);
+ query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock);
#endif
}
- TABLE* table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(ev->m_table_id);
+ TABLE* table= rgi->m_table_map.get_table(ev->m_table_id);
if (table)
{
@@ -220,7 +220,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
while (error == 0 && row_start < ev->m_rows_end)
{
uchar const *row_end= NULL;
- if ((error= do_prepare_row(ev_thd, rli, table, row_start, &row_end)))
+ if ((error= do_prepare_row(ev_thd, rgi, table, row_start, &row_end)))
break; // We should perform the after-row operation even in
// the case of error
@@ -280,7 +280,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
rollback at the caller along with sbr.
*/
ev_thd->reset_current_stmt_binlog_format_row();
- const_cast<Relay_log_info*>(rli)->cleanup_context(ev_thd, error);
+ rgi->cleanup_context(ev_thd, error);
ev_thd->is_slave_error= 1;
DBUG_RETURN(error);
}
@@ -953,7 +953,7 @@ int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
int
Write_rows_log_event_old::do_prepare_row(THD *thd_arg,
- Relay_log_info const *rli,
+ rpl_group_info *rgi,
TABLE *table,
uchar const *row_start,
uchar const **row_end)
@@ -962,7 +962,7 @@ Write_rows_log_event_old::do_prepare_row(THD *thd_arg,
DBUG_ASSERT(row_start && row_end);
int error;
- error= unpack_row_old(const_cast<Relay_log_info*>(rli),
+ error= unpack_row_old(rgi,
table, m_width, table->record[0],
row_start, m_rows_end,
&m_cols, row_end, &m_master_reclength,
@@ -1037,7 +1037,7 @@ int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
int
Delete_rows_log_event_old::do_prepare_row(THD *thd_arg,
- Relay_log_info const *rli,
+ rpl_group_info *rgi,
TABLE *table,
uchar const *row_start,
uchar const **row_end)
@@ -1050,7 +1050,7 @@ Delete_rows_log_event_old::do_prepare_row(THD *thd_arg,
*/
DBUG_ASSERT(table->s->fields >= m_width);
- error= unpack_row_old(const_cast<Relay_log_info*>(rli),
+ error= unpack_row_old(rgi,
table, m_width, table->record[0],
row_start, m_rows_end,
&m_cols, row_end, &m_master_reclength,
@@ -1134,7 +1134,7 @@ int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error)
int Update_rows_log_event_old::do_prepare_row(THD *thd_arg,
- Relay_log_info const *rli,
+ rpl_group_info *rgi,
TABLE *table,
uchar const *row_start,
uchar const **row_end)
@@ -1148,14 +1148,14 @@ int Update_rows_log_event_old::do_prepare_row(THD *thd_arg,
DBUG_ASSERT(table->s->fields >= m_width);
/* record[0] is the before image for the update */
- error= unpack_row_old(const_cast<Relay_log_info*>(rli),
+ error= unpack_row_old(rgi,
table, m_width, table->record[0],
row_start, m_rows_end,
&m_cols, row_end, &m_master_reclength,
table->read_set, PRE_GA_UPDATE_ROWS_EVENT);
row_start = *row_end;
/* m_after_image is the after image for the update */
- error= unpack_row_old(const_cast<Relay_log_info*>(rli),
+ error= unpack_row_old(rgi,
table, m_width, m_after_image,
row_start, m_rows_end,
&m_cols, row_end, &m_master_reclength,
@@ -1471,7 +1471,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
*/
DBUG_ASSERT(get_flags(STMT_END_F));
- const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
+ rgi->slave_close_thread_tables(thd);
thd->clear_error();
DBUG_RETURN(0);
}
@@ -1499,8 +1499,8 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
*/
lex_start(thd);
- if ((error= lock_tables(thd, rli->tables_to_lock,
- rli->tables_to_lock_count, 0)))
+ if ((error= lock_tables(thd, rgi->tables_to_lock,
+ rgi->tables_to_lock_count, 0)))
{
if (thd->is_slave_error || thd->is_fatal_error)
{
@@ -1522,7 +1522,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
"Error in %s event: when locking tables",
get_type_str());
}
- const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
+ rgi->slave_close_thread_tables(thd);
DBUG_RETURN(error);
}
@@ -1535,8 +1535,8 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
*/
{
- RPL_TABLE_LIST *ptr= rli->tables_to_lock;
- for (uint i= 0 ; ptr&& (i< rli->tables_to_lock_count);
+ RPL_TABLE_LIST *ptr= rgi->tables_to_lock;
+ for (uint i= 0 ; ptr&& (i< rgi->tables_to_lock_count);
ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global), i++)
{
TABLE *conv_table;
@@ -1544,7 +1544,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
ptr->table, &conv_table))
{
thd->is_slave_error= 1;
- const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
+ rgi->slave_close_thread_tables(thd);
DBUG_RETURN(ERR_BAD_TABLE_DEF);
}
ptr->m_conv_table= conv_table;
@@ -1566,18 +1566,18 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
Old_rows_log_event, we can invalidate the query cache for the
associated table.
*/
- for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
+ for (TABLE_LIST *ptr= rgi->tables_to_lock ; ptr ; ptr= ptr->next_global)
{
- const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
+ rgi->m_table_map.set_table(ptr->table_id, ptr->table);
}
#ifdef HAVE_QUERY_CACHE
- query_cache.invalidate_locked_for_write(thd, rli->tables_to_lock);
+ query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock);
#endif
}
TABLE*
table=
- m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
+ m_table= rgi->m_table_map.get_table(m_table_id);
if (table)
{
@@ -1657,7 +1657,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
if (!table->in_use)
table->in_use= thd;
- error= do_exec_row(rli);
+ error= do_exec_row(rgi);
DBUG_PRINT("info", ("error: %d", error));
DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
@@ -1696,7 +1696,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
(ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end));
if (!m_curr_row_end && !error)
- unpack_current_row(rli);
+ unpack_current_row(rgi);
// at this moment m_curr_row_end should be set
DBUG_ASSERT(error || m_curr_row_end != NULL);
@@ -1733,7 +1733,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
rollback at the caller along with sbr.
*/
thd->reset_current_stmt_binlog_format_row();
- const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
+ rgi->cleanup_context(thd, error);
thd->is_slave_error= 1;
DBUG_RETURN(error);
}
@@ -1812,7 +1812,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
*/
thd->reset_current_stmt_binlog_format_row();
- const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
+ rgi->cleanup_context(thd, 0);
}
DBUG_RETURN(error);
@@ -1998,8 +1998,7 @@ void Old_rows_log_event::print_helper(FILE *file,
*/
int
-Old_rows_log_event::write_row(const Relay_log_info *const rli,
- const bool overwrite)
+Old_rows_log_event::write_row(rpl_group_info *rgi, const bool overwrite)
{
DBUG_ENTER("write_row");
DBUG_ASSERT(m_table != NULL && thd != NULL);
@@ -2016,7 +2015,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli,
DBUG_RETURN(error);
/* unpack row into table->record[0] */
- error= unpack_current_row(rli); // TODO: how to handle errors?
+ error= unpack_current_row(rgi); // TODO: how to handle errors?
#ifndef DBUG_OFF
DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
@@ -2123,7 +2122,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli,
if (!get_flags(COMPLETE_ROWS_F))
{
restore_record(table,record[1]);
- error= unpack_current_row(rli);
+ error= unpack_current_row(rgi);
}
#ifndef DBUG_OFF
@@ -2218,7 +2217,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli,
for any following update/delete command.
*/
-int Old_rows_log_event::find_row(const Relay_log_info *rli)
+int Old_rows_log_event::find_row(rpl_group_info *rgi)
{
DBUG_ENTER("find_row");
@@ -2231,7 +2230,7 @@ int Old_rows_log_event::find_row(const Relay_log_info *rli)
// TODO: shall we check and report errors here?
prepare_record(table, m_width, FALSE /* don't check errors */);
- error= unpack_current_row(rli);
+ error= unpack_current_row(rgi);
#ifndef DBUG_OFF
DBUG_PRINT("info",("looking for the following record"));
@@ -2603,10 +2602,10 @@ Write_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabili
int
-Write_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
+Write_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
{
DBUG_ASSERT(m_table != NULL);
- int error= write_row(rli, TRUE /* overwrite */);
+ int error= write_row(rgi, TRUE /* overwrite */);
if (error && !thd->net.last_errno)
thd->net.last_errno= error;
@@ -2705,12 +2704,12 @@ Delete_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabil
}
-int Delete_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
+int Delete_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
{
int error;
DBUG_ASSERT(m_table != NULL);
- if (!(error= find_row(rli)))
+ if (!(error= find_row(rgi)))
{
/*
Delete the record found, located in record[0]
@@ -2804,11 +2803,11 @@ Update_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabil
int
-Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
+Update_rows_log_event_old::do_exec_row(rpl_group_info *rgi)
{
DBUG_ASSERT(m_table != NULL);
- int error= find_row(rli);
+ int error= find_row(rgi);
if (error)
{
/*
@@ -2816,7 +2815,7 @@ Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
able to skip to the next pair of updates
*/
m_curr_row= m_curr_row_end;
- unpack_current_row(rli);
+ unpack_current_row(rgi);
return error;
}
@@ -2834,7 +2833,7 @@ Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli)
store_record(m_table,record[1]);
m_curr_row= m_curr_row_end;
- error= unpack_current_row(rli); // this also updates m_curr_row_end
+ error= unpack_current_row(rgi); // this also updates m_curr_row_end
/*
Now we have the right row to update. The old row (the one we're