diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-09-16 14:33:49 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-09-16 14:33:49 +0200 |
commit | 5633dd822711a269098bdb127c76c4b1250fcf8d (patch) | |
tree | 3359d76be8be47f5e21d5c6c7f7d3e60f835d752 /sql/log_event_old.cc | |
parent | d107bdaa01ad34b1afb4542b981b9b19af499f7b (diff) | |
download | mariadb-git-5633dd822711a269098bdb127c76c4b1250fcf8d.tar.gz |
MDEV-4506: parallel replication.
Add a simple test case.
Fix bugs found.
Diffstat (limited to 'sql/log_event_old.cc')
-rw-r--r-- | sql/log_event_old.cc | 97 |
1 files changed, 48 insertions, 49 deletions
diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index db1b3fb5a9f..58f299dabe7 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -58,7 +58,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) */ DBUG_ASSERT(ev->get_flags(Old_rows_log_event::STMT_END_F)); - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd); + rgi->slave_close_thread_tables(ev_thd); ev_thd->clear_error(); DBUG_RETURN(0); } @@ -98,7 +98,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) */ ev_thd->lex->set_stmt_row_injection(); - if (open_and_lock_tables(ev_thd, rli->tables_to_lock, FALSE, 0)) + if (open_and_lock_tables(ev_thd, rgi->tables_to_lock, FALSE, 0)) { uint actual_error= ev_thd->stmt_da->sql_errno(); if (ev_thd->is_slave_error || ev_thd->is_fatal_error) @@ -113,7 +113,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) "unexpected success or fatal error")); ev_thd->is_slave_error= 1; } - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); + rgi->slave_close_thread_tables(thd); DBUG_RETURN(actual_error); } @@ -126,8 +126,8 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) */ { - RPL_TABLE_LIST *ptr= rli->tables_to_lock; - for (uint i= 0 ; ptr&& (i< rli->tables_to_lock_count); + RPL_TABLE_LIST *ptr= rgi->tables_to_lock; + for (uint i= 0 ; ptr&& (i< rgi->tables_to_lock_count); ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global), i++) { DBUG_ASSERT(ptr->m_tabledef_valid); @@ -136,7 +136,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) ptr->table, &conv_table)) { ev_thd->is_slave_error= 1; - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd); + rgi->slave_close_thread_tables(ev_thd); DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF); } DBUG_PRINT("debug", ("Table: %s.%s is compatible with master" @@ -161,15 +161,15 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) Old_rows_log_event, we can invalidate the query cache for the associated table. */ - TABLE_LIST *ptr= rli->tables_to_lock; - for (uint i=0; ptr && (i < rli->tables_to_lock_count); ptr= ptr->next_global, i++) - const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table); + TABLE_LIST *ptr= rgi->tables_to_lock; + for (uint i=0; ptr && (i < rgi->tables_to_lock_count); ptr= ptr->next_global, i++) + rgi->m_table_map.set_table(ptr->table_id, ptr->table); #ifdef HAVE_QUERY_CACHE - query_cache.invalidate_locked_for_write(thd, rli->tables_to_lock); + query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock); #endif } - TABLE* table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(ev->m_table_id); + TABLE* table= rgi->m_table_map.get_table(ev->m_table_id); if (table) { @@ -220,7 +220,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) while (error == 0 && row_start < ev->m_rows_end) { uchar const *row_end= NULL; - if ((error= do_prepare_row(ev_thd, rli, table, row_start, &row_end))) + if ((error= do_prepare_row(ev_thd, rgi, table, row_start, &row_end))) break; // We should perform the after-row operation even in // the case of error @@ -280,7 +280,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) rollback at the caller along with sbr. */ ev_thd->reset_current_stmt_binlog_format_row(); - const_cast<Relay_log_info*>(rli)->cleanup_context(ev_thd, error); + rgi->cleanup_context(ev_thd, error); ev_thd->is_slave_error= 1; DBUG_RETURN(error); } @@ -953,7 +953,7 @@ int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error) int Write_rows_log_event_old::do_prepare_row(THD *thd_arg, - Relay_log_info const *rli, + rpl_group_info *rgi, TABLE *table, uchar const *row_start, uchar const **row_end) @@ -962,7 +962,7 @@ Write_rows_log_event_old::do_prepare_row(THD *thd_arg, DBUG_ASSERT(row_start && row_end); int error; - error= unpack_row_old(const_cast<Relay_log_info*>(rli), + error= unpack_row_old(rgi, table, m_width, table->record[0], row_start, m_rows_end, &m_cols, row_end, &m_master_reclength, @@ -1037,7 +1037,7 @@ int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error) int Delete_rows_log_event_old::do_prepare_row(THD *thd_arg, - Relay_log_info const *rli, + rpl_group_info *rgi, TABLE *table, uchar const *row_start, uchar const **row_end) @@ -1050,7 +1050,7 @@ Delete_rows_log_event_old::do_prepare_row(THD *thd_arg, */ DBUG_ASSERT(table->s->fields >= m_width); - error= unpack_row_old(const_cast<Relay_log_info*>(rli), + error= unpack_row_old(rgi, table, m_width, table->record[0], row_start, m_rows_end, &m_cols, row_end, &m_master_reclength, @@ -1134,7 +1134,7 @@ int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error) int Update_rows_log_event_old::do_prepare_row(THD *thd_arg, - Relay_log_info const *rli, + rpl_group_info *rgi, TABLE *table, uchar const *row_start, uchar const **row_end) @@ -1148,14 +1148,14 @@ int Update_rows_log_event_old::do_prepare_row(THD *thd_arg, DBUG_ASSERT(table->s->fields >= m_width); /* record[0] is the before image for the update */ - error= unpack_row_old(const_cast<Relay_log_info*>(rli), + error= unpack_row_old(rgi, table, m_width, table->record[0], row_start, m_rows_end, &m_cols, row_end, &m_master_reclength, table->read_set, PRE_GA_UPDATE_ROWS_EVENT); row_start = *row_end; /* m_after_image is the after image for the update */ - error= unpack_row_old(const_cast<Relay_log_info*>(rli), + error= unpack_row_old(rgi, table, m_width, m_after_image, row_start, m_rows_end, &m_cols, row_end, &m_master_reclength, @@ -1471,7 +1471,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) */ DBUG_ASSERT(get_flags(STMT_END_F)); - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); + rgi->slave_close_thread_tables(thd); thd->clear_error(); DBUG_RETURN(0); } @@ -1499,8 +1499,8 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) */ lex_start(thd); - if ((error= lock_tables(thd, rli->tables_to_lock, - rli->tables_to_lock_count, 0))) + if ((error= lock_tables(thd, rgi->tables_to_lock, + rgi->tables_to_lock_count, 0))) { if (thd->is_slave_error || thd->is_fatal_error) { @@ -1522,7 +1522,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) "Error in %s event: when locking tables", get_type_str()); } - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); + rgi->slave_close_thread_tables(thd); DBUG_RETURN(error); } @@ -1535,8 +1535,8 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) */ { - RPL_TABLE_LIST *ptr= rli->tables_to_lock; - for (uint i= 0 ; ptr&& (i< rli->tables_to_lock_count); + RPL_TABLE_LIST *ptr= rgi->tables_to_lock; + for (uint i= 0 ; ptr&& (i< rgi->tables_to_lock_count); ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global), i++) { TABLE *conv_table; @@ -1544,7 +1544,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) ptr->table, &conv_table)) { thd->is_slave_error= 1; - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); + rgi->slave_close_thread_tables(thd); DBUG_RETURN(ERR_BAD_TABLE_DEF); } ptr->m_conv_table= conv_table; @@ -1566,18 +1566,18 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) Old_rows_log_event, we can invalidate the query cache for the associated table. */ - for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global) + for (TABLE_LIST *ptr= rgi->tables_to_lock ; ptr ; ptr= ptr->next_global) { - const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table); + rgi->m_table_map.set_table(ptr->table_id, ptr->table); } #ifdef HAVE_QUERY_CACHE - query_cache.invalidate_locked_for_write(thd, rli->tables_to_lock); + query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock); #endif } TABLE* table= - m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id); + m_table= rgi->m_table_map.get_table(m_table_id); if (table) { @@ -1657,7 +1657,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) if (!table->in_use) table->in_use= thd; - error= do_exec_row(rli); + error= do_exec_row(rgi); DBUG_PRINT("info", ("error: %d", error)); DBUG_ASSERT(error != HA_ERR_RECORD_DELETED); @@ -1696,7 +1696,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end)); if (!m_curr_row_end && !error) - unpack_current_row(rli); + unpack_current_row(rgi); // at this moment m_curr_row_end should be set DBUG_ASSERT(error || m_curr_row_end != NULL); @@ -1733,7 +1733,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) rollback at the caller along with sbr. */ thd->reset_current_stmt_binlog_format_row(); - const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error); + rgi->cleanup_context(thd, error); thd->is_slave_error= 1; DBUG_RETURN(error); } @@ -1812,7 +1812,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) */ thd->reset_current_stmt_binlog_format_row(); - const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0); + rgi->cleanup_context(thd, 0); } DBUG_RETURN(error); @@ -1998,8 +1998,7 @@ void Old_rows_log_event::print_helper(FILE *file, */ int -Old_rows_log_event::write_row(const Relay_log_info *const rli, - const bool overwrite) +Old_rows_log_event::write_row(rpl_group_info *rgi, const bool overwrite) { DBUG_ENTER("write_row"); DBUG_ASSERT(m_table != NULL && thd != NULL); @@ -2016,7 +2015,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli, DBUG_RETURN(error); /* unpack row into table->record[0] */ - error= unpack_current_row(rli); // TODO: how to handle errors? + error= unpack_current_row(rgi); // TODO: how to handle errors? #ifndef DBUG_OFF DBUG_DUMP("record[0]", table->record[0], table->s->reclength); @@ -2123,7 +2122,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli, if (!get_flags(COMPLETE_ROWS_F)) { restore_record(table,record[1]); - error= unpack_current_row(rli); + error= unpack_current_row(rgi); } #ifndef DBUG_OFF @@ -2218,7 +2217,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli, for any following update/delete command. */ -int Old_rows_log_event::find_row(const Relay_log_info *rli) +int Old_rows_log_event::find_row(rpl_group_info *rgi) { DBUG_ENTER("find_row"); @@ -2231,7 +2230,7 @@ int Old_rows_log_event::find_row(const Relay_log_info *rli) // TODO: shall we check and report errors here? prepare_record(table, m_width, FALSE /* don't check errors */); - error= unpack_current_row(rli); + error= unpack_current_row(rgi); #ifndef DBUG_OFF DBUG_PRINT("info",("looking for the following record")); @@ -2603,10 +2602,10 @@ Write_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabili int -Write_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) +Write_rows_log_event_old::do_exec_row(rpl_group_info *rgi) { DBUG_ASSERT(m_table != NULL); - int error= write_row(rli, TRUE /* overwrite */); + int error= write_row(rgi, TRUE /* overwrite */); if (error && !thd->net.last_errno) thd->net.last_errno= error; @@ -2705,12 +2704,12 @@ Delete_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabil } -int Delete_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) +int Delete_rows_log_event_old::do_exec_row(rpl_group_info *rgi) { int error; DBUG_ASSERT(m_table != NULL); - if (!(error= find_row(rli))) + if (!(error= find_row(rgi))) { /* Delete the record found, located in record[0] @@ -2804,11 +2803,11 @@ Update_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabil int -Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) +Update_rows_log_event_old::do_exec_row(rpl_group_info *rgi) { DBUG_ASSERT(m_table != NULL); - int error= find_row(rli); + int error= find_row(rgi); if (error) { /* @@ -2816,7 +2815,7 @@ Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) able to skip to the next pair of updates */ m_curr_row= m_curr_row_end; - unpack_current_row(rli); + unpack_current_row(rgi); return error; } @@ -2834,7 +2833,7 @@ Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) store_record(m_table,record[1]); m_curr_row= m_curr_row_end; - error= unpack_current_row(rli); // this also updates m_curr_row_end + error= unpack_current_row(rgi); // this also updates m_curr_row_end /* Now we have the right row to update. The old row (the one we're |