diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-09-16 14:33:49 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-09-16 14:33:49 +0200 |
commit | 5633dd822711a269098bdb127c76c4b1250fcf8d (patch) | |
tree | 3359d76be8be47f5e21d5c6c7f7d3e60f835d752 | |
parent | d107bdaa01ad34b1afb4542b981b9b19af499f7b (diff) | |
download | mariadb-git-5633dd822711a269098bdb127c76c4b1250fcf8d.tar.gz |
MDEV-4506: parallel replication.
Add a simple test case.
Fix bugs found.
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_parallel.result | 45 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel.test | 102 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel2.test | 70 | ||||
-rw-r--r-- | sql/log_event.cc | 90 | ||||
-rw-r--r-- | sql/log_event.h | 16 | ||||
-rw-r--r-- | sql/log_event_old.cc | 97 | ||||
-rw-r--r-- | sql/log_event_old.h | 24 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 14 | ||||
-rw-r--r-- | sql/rpl_record.cc | 10 | ||||
-rw-r--r-- | sql/rpl_record.h | 4 | ||||
-rw-r--r-- | sql/rpl_record_old.cc | 6 | ||||
-rw-r--r-- | sql/rpl_record_old.h | 2 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 251 | ||||
-rw-r--r-- | sql/rpl_rli.h | 49 | ||||
-rw-r--r-- | sql/slave.cc | 18 | ||||
-rw-r--r-- | sql/sql_binlog.cc | 2 |
16 files changed, 399 insertions, 401 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result new file mode 100644 index 00000000000..e60b9406b8e --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_parallel.result @@ -0,0 +1,45 @@ +include/rpl_init.inc [topology=1->2] +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=10; +ERROR HY000: This operation cannot be performed as you have a running slave ''; run STOP SLAVE '' first +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=10; +CHANGE MASTER TO master_use_gtid=slave_pos; +include/start_slave.inc +*** Test long-running query in domain 1 can run in parallel with short queries in domain 0 *** +CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM; +CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB; +INSERT INTO t1 VALUES (1); +INSERT INTO t2 VALUES (1); +LOCK TABLE t1 WRITE; +SET gtid_domain_id=1; +INSERT INTO t1 VALUES (2); +SET gtid_domain_id=0; +INSERT INTO t2 VALUES (2); +INSERT INTO t2 VALUES (3); +BEGIN; +INSERT INTO t2 VALUES (4); +INSERT INTO t2 VALUES (5); +COMMIT; +INSERT INTO t2 VALUES (6); +SELECT * FROM t2 ORDER by a; +a +1 +2 +3 +4 +5 +6 +SELECT * FROM t1; +a +1 +UNLOCK TABLES; +SELECT * FROM t1 ORDER BY a; +a +1 +2 +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +include/start_slave.inc +DROP TABLE t1,t2; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test index 3ace346e006..b9ba88489e4 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel.test +++ b/mysql-test/suite/rpl/t/rpl_parallel.test @@ -1,60 +1,74 @@ ---source include/have_binlog_format_statement.inc +--source include/have_innodb.inc +--source include/have_debug.inc +--source include/have_debug_sync.inc +--let $rpl_topology=1->2 +--source include/rpl_init.inc -connect (s1,127.0.0.1,root,,test,$MASTER_MYPORT,); -connect (s2,127.0.0.1,root,,test,$SLAVE_MYPORT,); +# Test various aspects of parallel replication. ---connection s1 -SELECT @@server_id; -SET sql_log_bin=0; -CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=MyISAM; -SET sql_log_bin=1; +--connection server_2 +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +--error ER_SLAVE_MUST_STOP +SET GLOBAL slave_parallel_threads=10; +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=10; +CHANGE MASTER TO master_use_gtid=slave_pos; +--source include/start_slave.inc ---connection s2 -SELECT @@server_id; -SET sql_log_bin=0; -CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=MyISAM; -SET sql_log_bin=1; ---replace_result $MASTER_MYPORT MASTER_PORT -eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT, - master_user='root', master_use_gtid=current_pos; +--echo *** Test long-running query in domain 1 can run in parallel with short queries in domain 0 *** ---connection s1 -SET gtid_domain_id=0; +--connection server_1 +CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM; +CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB; INSERT INTO t1 VALUES (1); +INSERT INTO t2 VALUES (1); +--save_master_pos + +--connection server_2 +--sync_with_master + +# Block the table t1 to simulate a replicated query taking a long time. +--connect (con_temp,127.0.0.1,root,,test,$SERVER_MYPORT_2,) +LOCK TABLE t1 WRITE; + +--connection server_1 SET gtid_domain_id=1; +# This query will be blocked on the slave until UNLOCK TABLES. INSERT INTO t1 VALUES (2); -SET gtid_domain_id=2; -INSERT INTO t1 VALUES (3); -SET gtid_domain_id=0; -INSERT INTO t1 VALUES (4); -SET gtid_domain_id=1; -INSERT INTO t1 VALUES (5); -SET gtid_domain_id=2; -INSERT INTO t1 VALUES (6); SET gtid_domain_id=0; -INSERT INTO t1 VALUES (7); -SET gtid_domain_id=1; -INSERT INTO t1 VALUES (8); -SET gtid_domain_id=2; -INSERT INTO t1 VALUES (9); +# These t2 queries can be replicated in parallel with the prior t1 query, as +# they are in a separate replication domain. +INSERT INTO t2 VALUES (2); +INSERT INTO t2 VALUES (3); +BEGIN; +INSERT INTO t2 VALUES (4); +INSERT INTO t2 VALUES (5); +COMMIT; +INSERT INTO t2 VALUES (6); ---connection s2 -query_vertical SHOW SLAVE STATUS; +--connection server_2 +--let $wait_condition= SELECT COUNT(*) = 6 FROM t2 +--source include/wait_condition.inc ---source include/start_slave.inc +SELECT * FROM t2 ORDER by a; + +--connection con_temp SELECT * FROM t1; +UNLOCK TABLES; + +--connection server_2 +--let $wait_condition= SELECT COUNT(*) = 2 FROM t1 +--source include/wait_condition.inc +SELECT * FROM t1 ORDER BY a; + +--connection server_2 --source include/stop_slave.inc -SELECT * FROM t1; +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +--source include/start_slave.inc ---connection s1 -SET sql_log_bin=0; -DROP TABLE t1; -SET sql_log_bin=1; +--connection server_1 +DROP TABLE t1,t2; ---connection s2 -RESET SLAVE ALL; -SET sql_log_bin=0; -DROP TABLE t1; -SET sql_log_bin=1; +--source include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel2.test b/mysql-test/suite/rpl/t/rpl_parallel2.test deleted file mode 100644 index b3f970c909a..00000000000 --- a/mysql-test/suite/rpl/t/rpl_parallel2.test +++ /dev/null @@ -1,70 +0,0 @@ ---source include/have_binlog_format_statement.inc ---source include/have_xtradb.inc - -connect (m1,127.0.0.1,root,,test,$MASTER_MYPORT,); -connect (m2,127.0.0.1,root,,test,$MASTER_MYPORT,); -connect (m3,127.0.0.1,root,,test,$MASTER_MYPORT,); -connect (m4,127.0.0.1,root,,test,$MASTER_MYPORT,); -connect (s1,127.0.0.1,root,,test,$SLAVE_MYPORT,); -connect (s2,127.0.0.1,root,,test,$SLAVE_MYPORT,); -connect (s3,127.0.0.1,root,,test,$SLAVE_MYPORT,); -connect (s4,127.0.0.1,root,,test,$SLAVE_MYPORT,); - ---connection m1 -SELECT @@server_id; -SET sql_log_bin=0; -CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; -SET sql_log_bin=1; -SET @old_count= @@GLOBAL.binlog_commit_wait_count; -SET @old_usec= @@GLOBAL.binlog_commit_wait_usec; -SET GLOBAL binlog_commit_wait_usec = 30*1000000; - ---connection s1 -SELECT @@server_id; -SET sql_log_bin=0; -CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; -SET sql_log_bin=1; - ---replace_result $MASTER_MYPORT MASTER_PORT -eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT, - master_user='root', master_use_gtid=current_pos; - ---connection m1 -SET GLOBAL binlog_commit_wait_count = 4; - -send INSERT INTO t1 VALUES (1); - ---connection m2 -send INSERT INTO t1 VALUES (2); ---connection m3 -send INSERT INTO t1 VALUES (3); ---connection m4 -INSERT INTO t1 VALUES (4); ---connection m1 -reap; ---connection m2 -reap; ---connection m3 -reap; - ---connection m1 -SHOW BINLOG EVENTS; - ---connection s1 ---source include/start_slave.inc -SELECT * FROM t1; ---source include/stop_slave.inc -SELECT * FROM t1; - ---connection m1 -SET sql_log_bin=0; -DROP TABLE t1; -SET sql_log_bin=1; -SET GLOBAL binlog_commit_wait_count= @old_count; -SET GLOBAL binlog_commit_wait_usec= @old_usec; - ---connection s1 -RESET SLAVE ALL; -SET sql_log_bin=0; -DROP TABLE t1; -SET sql_log_bin=1; diff --git a/sql/log_event.cc b/sql/log_event.cc index c0a2ebfa365..cfbdd6aa626 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -130,7 +130,7 @@ const ulong checksum_version_product_mariadb= checksum_version_split_mariadb[2]; #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD* thd); +static int rows_event_stmt_cleanup(rpl_group_info *rgi, THD* thd); static const char *HA_ERR(int i) { @@ -3854,7 +3854,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); clear_all_errors(thd, const_cast<Relay_log_info*>(rli)); - if (strcmp("COMMIT", query) == 0 && rli->tables_to_lock) + if (strcmp("COMMIT", query) == 0 && rgi->tables_to_lock) { /* Cleaning-up the last statement context: @@ -3863,7 +3863,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, */ int error; char llbuff[22]; - if ((error= rows_event_stmt_cleanup(const_cast<Relay_log_info*>(rli), thd))) + if ((error= rows_event_stmt_cleanup(rgi, thd))) { const_cast<Relay_log_info*>(rli)->report(ERROR_LEVEL, error, "Error in cleaning up after an event preceeding the commit; " @@ -3883,7 +3883,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, } else { - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); + rgi->slave_close_thread_tables(thd); } /* @@ -4835,7 +4835,7 @@ int Format_description_log_event::do_apply_event(rpl_group_info *rgi) "or ROLLBACK in relay log). A probable cause is that " "the master died while writing the transaction to " "its binary log, thus rolled back too."); - const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 1); + rgi->cleanup_context(thd, 1); } /* @@ -5533,7 +5533,7 @@ int Load_log_event::do_apply_event(NET* net, rpl_group_info *rgi, clear_all_errors(thd, const_cast<Relay_log_info*>(rli)); /* see Query_log_event::do_apply_event() and BUG#13360 */ - DBUG_ASSERT(!rli->m_table_map.count()); + DBUG_ASSERT(!rgi->m_table_map.count()); /* Usually lex_start() is called by mysql_parse(), but we need it here as the present method does not call mysql_parse(). @@ -9089,7 +9089,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) */ DBUG_ASSERT(get_flags(STMT_END_F)); - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); + rgi->slave_close_thread_tables(thd); thd->clear_error(); DBUG_RETURN(0); } @@ -9151,7 +9151,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) /* A small test to verify that objects have consistent types */ DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS)); - if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0)) + if (open_and_lock_tables(thd, rgi->tables_to_lock, FALSE, 0)) { uint actual_error= thd->stmt_da->sql_errno(); if (thd->is_slave_error || thd->is_fatal_error) @@ -9168,7 +9168,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) "unexpected success or fatal error")); thd->is_slave_error= 1; } - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); + rgi->slave_close_thread_tables(thd); DBUG_RETURN(actual_error); } @@ -9182,7 +9182,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) { DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p", - rli->tables_to_lock)); + rgi->tables_to_lock)); /** When using RBR and MyISAM MERGE tables the base tables that make @@ -9196,8 +9196,8 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) NOTE: The base tables are added here are removed when close_thread_tables is called. */ - RPL_TABLE_LIST *ptr= rli->tables_to_lock; - for (uint i= 0 ; ptr && (i < rli->tables_to_lock_count); + RPL_TABLE_LIST *ptr= rgi->tables_to_lock; + for (uint i= 0 ; ptr && (i < rgi->tables_to_lock_count); ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global), i++) { DBUG_ASSERT(ptr->m_tabledef_valid); @@ -9213,7 +9213,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) having severe errors which should not be skiped. */ thd->is_slave_error= 1; - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); + rgi->slave_close_thread_tables(thd); DBUG_RETURN(ERR_BAD_TABLE_DEF); } DBUG_PRINT("debug", ("Table: %s.%s is compatible with master" @@ -9238,18 +9238,18 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) Rows_log_event, we can invalidate the query cache for the associated table. */ - TABLE_LIST *ptr= rli->tables_to_lock; - for (uint i=0 ; ptr && (i < rli->tables_to_lock_count); ptr= ptr->next_global, i++) - const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table); + TABLE_LIST *ptr= rgi->tables_to_lock; + for (uint i=0 ; ptr && (i < rgi->tables_to_lock_count); ptr= ptr->next_global, i++) + rgi->m_table_map.set_table(ptr->table_id, ptr->table); #ifdef HAVE_QUERY_CACHE - query_cache.invalidate_locked_for_write(thd, rli->tables_to_lock); + query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock); #endif } TABLE* table= - m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id); + m_table= rgi->m_table_map.get_table(m_table_id); DBUG_PRINT("debug", ("m_table: 0x%lx, m_table_id: %lu", (ulong) m_table, m_table_id)); @@ -9331,7 +9331,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) if (!table->in_use) table->in_use= thd; - error= do_exec_row(rli); + error= do_exec_row(rgi); if (error) DBUG_PRINT("info", ("error: %s", HA_ERR(error))); @@ -9371,7 +9371,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end)); if (!m_curr_row_end && !error) - error= unpack_current_row(rli); + error= unpack_current_row(rgi); // at this moment m_curr_row_end should be set DBUG_ASSERT(error || m_curr_row_end != NULL); @@ -9432,7 +9432,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) DBUG_RETURN(error); } - if (get_flags(STMT_END_F) && (error= rows_event_stmt_cleanup(rli, thd))) + if (get_flags(STMT_END_F) && (error= rows_event_stmt_cleanup(rgi, thd))) slave_rows_error_report(ERROR_LEVEL, thd->is_error() ? 0 : error, rli, thd, table, @@ -9466,7 +9466,7 @@ Rows_log_event::do_shall_skip(Relay_log_info *rli) @retval non-zero Error at the commit. */ -static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd) +static int rows_event_stmt_cleanup(rpl_group_info *rgi, THD * thd) { int error; { @@ -9520,7 +9520,7 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd) */ thd->reset_current_stmt_binlog_format_row(); - const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0); + rgi->cleanup_context(thd, 0); } return error; } @@ -10259,10 +10259,11 @@ enum enum_tbl_map_status rli->tables_to_lock. */ static enum_tbl_map_status -check_table_map(Relay_log_info const *rli, RPL_TABLE_LIST *table_list) +check_table_map(rpl_group_info *rgi, RPL_TABLE_LIST *table_list) { DBUG_ENTER("check_table_map"); enum_tbl_map_status res= OK_TO_PROCESS; + Relay_log_info *rli= rgi->rli; if (rli->sql_thd->slave_thread /* filtering is for slave only */ && (!rli->mi->rpl_filter->db_ok(table_list->db) || @@ -10270,8 +10271,8 @@ check_table_map(Relay_log_info const *rli, RPL_TABLE_LIST *table_list) res= FILTERED_OUT; else { - RPL_TABLE_LIST *ptr= static_cast<RPL_TABLE_LIST*>(rli->tables_to_lock); - for(uint i=0 ; ptr && (i< rli->tables_to_lock_count); + RPL_TABLE_LIST *ptr= static_cast<RPL_TABLE_LIST*>(rgi->tables_to_lock); + for(uint i=0 ; ptr && (i< rgi->tables_to_lock_count); ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_local), i++) { if (ptr->table_id == table_list->table_id) @@ -10303,7 +10304,6 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi) Rpl_filter *filter; Relay_log_info const *rli= rgi->rli; DBUG_ENTER("Table_map_log_event::do_apply_event(Relay_log_info*)"); - DBUG_ASSERT(rli->sql_thd == thd); /* Step the query id to mark what columns that are actually used. */ thd->set_query_id(next_query_id()); @@ -10328,7 +10328,7 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi) table_list->updating= 1; table_list->required_type= FRMTYPE_TABLE; DBUG_PRINT("debug", ("table: %s is mapped to %u", table_list->table_name, table_list->table_id)); - enum_tbl_map_status tblmap_status= check_table_map(rli, table_list); + enum_tbl_map_status tblmap_status= check_table_map(rgi, table_list); if (tblmap_status == OK_TO_PROCESS) { DBUG_ASSERT(thd->lex->query_tables != table_list); @@ -10354,9 +10354,9 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi) We record in the slave's information that the table should be locked by linking the table into the list of tables to lock. */ - table_list->next_global= table_list->next_local= rli->tables_to_lock; - const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list; - const_cast<Relay_log_info*>(rli)->tables_to_lock_count++; + table_list->next_global= table_list->next_local= rgi->tables_to_lock; + rgi->tables_to_lock= table_list; + rgi->tables_to_lock_count++; /* 'memory' is freed in clear_tables_to_lock */ } else // FILTERED_OUT, SAME_ID_MAPPING_* @@ -10709,7 +10709,7 @@ is_duplicate_key_error(int errcode) */ int -Rows_log_event::write_row(const Relay_log_info *const rli, +Rows_log_event::write_row(rpl_group_info *rgi, const bool overwrite) { DBUG_ENTER("write_row"); @@ -10724,7 +10724,7 @@ Rows_log_event::write_row(const Relay_log_info *const rli, table->file->ht->db_type != DB_TYPE_NDBCLUSTER); /* unpack row into table->record[0] */ - if ((error= unpack_current_row(rli))) + if ((error= unpack_current_row(rgi))) DBUG_RETURN(error); if (m_curr_row == m_rows_buf) @@ -10841,7 +10841,7 @@ Rows_log_event::write_row(const Relay_log_info *const rli, if (!get_flags(COMPLETE_ROWS_F)) { restore_record(table,record[1]); - error= unpack_current_row(rli); + error= unpack_current_row(rgi); } #ifndef DBUG_OFF @@ -10907,10 +10907,10 @@ Rows_log_event::write_row(const Relay_log_info *const rli, #endif int -Write_rows_log_event::do_exec_row(const Relay_log_info *const rli) +Write_rows_log_event::do_exec_row(rpl_group_info *rgi) { DBUG_ASSERT(m_table != NULL); - int error= write_row(rli, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT); + int error= write_row(rgi, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT); if (error && !thd->is_error()) { @@ -11214,7 +11214,7 @@ void issue_long_find_row_warning(Log_event_type type, for any following update/delete command. */ -int Rows_log_event::find_row(const Relay_log_info *rli) +int Rows_log_event::find_row(rpl_group_info *rgi) { DBUG_ENTER("Rows_log_event::find_row"); @@ -11232,7 +11232,7 @@ int Rows_log_event::find_row(const Relay_log_info *rli) */ prepare_record(table, m_width, FALSE); - error= unpack_current_row(rli); + error= unpack_current_row(rgi); #ifndef DBUG_OFF DBUG_PRINT("info",("looking for the following record")); @@ -11497,7 +11497,7 @@ int Rows_log_event::find_row(const Relay_log_info *rli) end: if (is_table_scan || is_index_scan) issue_long_find_row_warning(get_type_code(), m_table->alias.c_ptr(), - is_index_scan, rli); + is_index_scan, rgi->rli); table->default_column_bitmaps(); DBUG_RETURN(error); } @@ -11565,12 +11565,12 @@ Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability return error; } -int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli) +int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi) { int error; DBUG_ASSERT(m_table != NULL); - if (!(error= find_row(rli))) + if (!(error= find_row(rgi))) { /* Delete the record found, located in record[0] @@ -11691,11 +11691,11 @@ Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability } int -Update_rows_log_event::do_exec_row(const Relay_log_info *const rli) +Update_rows_log_event::do_exec_row(rpl_group_info *rgi) { DBUG_ASSERT(m_table != NULL); - int error= find_row(rli); + int error= find_row(rgi); if (error) { /* @@ -11703,7 +11703,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli) able to skip to the next pair of updates */ m_curr_row= m_curr_row_end; - unpack_current_row(rli); + unpack_current_row(rgi); return error; } @@ -11722,7 +11722,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli) m_curr_row= m_curr_row_end; /* this also updates m_curr_row_end */ - if ((error= unpack_current_row(rli))) + if ((error= unpack_current_row(rgi))) goto err; /* diff --git a/sql/log_event.h b/sql/log_event.h index 1dc7f516727..d689ebcd582 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -4256,16 +4256,16 @@ protected: uint m_key_nr; /* Key number */ int find_key(); // Find a best key to use in find_row() - int find_row(const Relay_log_info *const); - int write_row(const Relay_log_info *const, const bool); + int find_row(rpl_group_info *); + int write_row(rpl_group_info *, const bool); // Unpack the current row into m_table->record[0] - int unpack_current_row(const Relay_log_info *const rli) + int unpack_current_row(rpl_group_info *rgi) { DBUG_ASSERT(m_table); ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT); - int const result= ::unpack_row(rli, m_table, m_width, m_curr_row, + int const result= ::unpack_row(rgi, m_table, m_width, m_curr_row, m_rows_end, &m_cols, &m_curr_row_end, &m_master_reclength); if (m_curr_row_end > m_rows_end) @@ -4331,7 +4331,7 @@ private: 0 if execution succeeded, 1 if execution failed. */ - virtual int do_exec_row(const Relay_log_info *const rli) = 0; + virtual int do_exec_row(rpl_group_info *rli) = 0; #endif /* defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) */ friend class Old_rows_log_event; @@ -4387,7 +4387,7 @@ private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) virtual int do_before_row_operations(const Slave_reporting_capability *const); virtual int do_after_row_operations(const Slave_reporting_capability *const,int); - virtual int do_exec_row(const Relay_log_info *const); + virtual int do_exec_row(rpl_group_info *); #endif }; @@ -4461,7 +4461,7 @@ protected: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) virtual int do_before_row_operations(const Slave_reporting_capability *const); virtual int do_after_row_operations(const Slave_reporting_capability *const,int); - virtual int do_exec_row(const Relay_log_info *const); + virtual int do_exec_row(rpl_group_info *); #endif /* defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) */ }; @@ -4526,7 +4526,7 @@ protected: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) virtual int do_before_row_operations(const Slave_reporting_capability *const); virtual int do_after_row_operations(const Slave_reporting_capability *const,int); - virtual int do_exec_row(const Relay_log_info *const); + virtual int do_exec_row(rpl_group_info *); #endif }; diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index db1b3fb5a9f..58f299dabe7 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -58,7 +58,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) */ DBUG_ASSERT(ev->get_flags(Old_rows_log_event::STMT_END_F)); - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd); + rgi->slave_close_thread_tables(ev_thd); ev_thd->clear_error(); DBUG_RETURN(0); } @@ -98,7 +98,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) */ ev_thd->lex->set_stmt_row_injection(); - if (open_and_lock_tables(ev_thd, rli->tables_to_lock, FALSE, 0)) + if (open_and_lock_tables(ev_thd, rgi->tables_to_lock, FALSE, 0)) { uint actual_error= ev_thd->stmt_da->sql_errno(); if (ev_thd->is_slave_error || ev_thd->is_fatal_error) @@ -113,7 +113,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) "unexpected success or fatal error")); ev_thd->is_slave_error= 1; } - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); + rgi->slave_close_thread_tables(thd); DBUG_RETURN(actual_error); } @@ -126,8 +126,8 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) */ { - RPL_TABLE_LIST *ptr= rli->tables_to_lock; - for (uint i= 0 ; ptr&& (i< rli->tables_to_lock_count); + RPL_TABLE_LIST *ptr= rgi->tables_to_lock; + for (uint i= 0 ; ptr&& (i< rgi->tables_to_lock_count); ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global), i++) { DBUG_ASSERT(ptr->m_tabledef_valid); @@ -136,7 +136,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) ptr->table, &conv_table)) { ev_thd->is_slave_error= 1; - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(ev_thd); + rgi->slave_close_thread_tables(ev_thd); DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF); } DBUG_PRINT("debug", ("Table: %s.%s is compatible with master" @@ -161,15 +161,15 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) Old_rows_log_event, we can invalidate the query cache for the associated table. */ - TABLE_LIST *ptr= rli->tables_to_lock; - for (uint i=0; ptr && (i < rli->tables_to_lock_count); ptr= ptr->next_global, i++) - const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table); + TABLE_LIST *ptr= rgi->tables_to_lock; + for (uint i=0; ptr && (i < rgi->tables_to_lock_count); ptr= ptr->next_global, i++) + rgi->m_table_map.set_table(ptr->table_id, ptr->table); #ifdef HAVE_QUERY_CACHE - query_cache.invalidate_locked_for_write(thd, rli->tables_to_lock); + query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock); #endif } - TABLE* table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(ev->m_table_id); + TABLE* table= rgi->m_table_map.get_table(ev->m_table_id); if (table) { @@ -220,7 +220,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) while (error == 0 && row_start < ev->m_rows_end) { uchar const *row_end= NULL; - if ((error= do_prepare_row(ev_thd, rli, table, row_start, &row_end))) + if ((error= do_prepare_row(ev_thd, rgi, table, row_start, &row_end))) break; // We should perform the after-row operation even in // the case of error @@ -280,7 +280,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi) rollback at the caller along with sbr. */ ev_thd->reset_current_stmt_binlog_format_row(); - const_cast<Relay_log_info*>(rli)->cleanup_context(ev_thd, error); + rgi->cleanup_context(ev_thd, error); ev_thd->is_slave_error= 1; DBUG_RETURN(error); } @@ -953,7 +953,7 @@ int Write_rows_log_event_old::do_after_row_operations(TABLE *table, int error) int Write_rows_log_event_old::do_prepare_row(THD *thd_arg, - Relay_log_info const *rli, + rpl_group_info *rgi, TABLE *table, uchar const *row_start, uchar const **row_end) @@ -962,7 +962,7 @@ Write_rows_log_event_old::do_prepare_row(THD *thd_arg, DBUG_ASSERT(row_start && row_end); int error; - error= unpack_row_old(const_cast<Relay_log_info*>(rli), + error= unpack_row_old(rgi, table, m_width, table->record[0], row_start, m_rows_end, &m_cols, row_end, &m_master_reclength, @@ -1037,7 +1037,7 @@ int Delete_rows_log_event_old::do_after_row_operations(TABLE *table, int error) int Delete_rows_log_event_old::do_prepare_row(THD *thd_arg, - Relay_log_info const *rli, + rpl_group_info *rgi, TABLE *table, uchar const *row_start, uchar const **row_end) @@ -1050,7 +1050,7 @@ Delete_rows_log_event_old::do_prepare_row(THD *thd_arg, */ DBUG_ASSERT(table->s->fields >= m_width); - error= unpack_row_old(const_cast<Relay_log_info*>(rli), + error= unpack_row_old(rgi, table, m_width, table->record[0], row_start, m_rows_end, &m_cols, row_end, &m_master_reclength, @@ -1134,7 +1134,7 @@ int Update_rows_log_event_old::do_after_row_operations(TABLE *table, int error) int Update_rows_log_event_old::do_prepare_row(THD *thd_arg, - Relay_log_info const *rli, + rpl_group_info *rgi, TABLE *table, uchar const *row_start, uchar const **row_end) @@ -1148,14 +1148,14 @@ int Update_rows_log_event_old::do_prepare_row(THD *thd_arg, DBUG_ASSERT(table->s->fields >= m_width); /* record[0] is the before image for the update */ - error= unpack_row_old(const_cast<Relay_log_info*>(rli), + error= unpack_row_old(rgi, table, m_width, table->record[0], row_start, m_rows_end, &m_cols, row_end, &m_master_reclength, table->read_set, PRE_GA_UPDATE_ROWS_EVENT); row_start = *row_end; /* m_after_image is the after image for the update */ - error= unpack_row_old(const_cast<Relay_log_info*>(rli), + error= unpack_row_old(rgi, table, m_width, m_after_image, row_start, m_rows_end, &m_cols, row_end, &m_master_reclength, @@ -1471,7 +1471,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) */ DBUG_ASSERT(get_flags(STMT_END_F)); - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); + rgi->slave_close_thread_tables(thd); thd->clear_error(); DBUG_RETURN(0); } @@ -1499,8 +1499,8 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) */ lex_start(thd); - if ((error= lock_tables(thd, rli->tables_to_lock, - rli->tables_to_lock_count, 0))) + if ((error= lock_tables(thd, rgi->tables_to_lock, + rgi->tables_to_lock_count, 0))) { if (thd->is_slave_error || thd->is_fatal_error) { @@ -1522,7 +1522,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) "Error in %s event: when locking tables", get_type_str()); } - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); + rgi->slave_close_thread_tables(thd); DBUG_RETURN(error); } @@ -1535,8 +1535,8 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) */ { - RPL_TABLE_LIST *ptr= rli->tables_to_lock; - for (uint i= 0 ; ptr&& (i< rli->tables_to_lock_count); + RPL_TABLE_LIST *ptr= rgi->tables_to_lock; + for (uint i= 0 ; ptr&& (i< rgi->tables_to_lock_count); ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global), i++) { TABLE *conv_table; @@ -1544,7 +1544,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) ptr->table, &conv_table)) { thd->is_slave_error= 1; - const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd); + rgi->slave_close_thread_tables(thd); DBUG_RETURN(ERR_BAD_TABLE_DEF); } ptr->m_conv_table= conv_table; @@ -1566,18 +1566,18 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) Old_rows_log_event, we can invalidate the query cache for the associated table. */ - for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global) + for (TABLE_LIST *ptr= rgi->tables_to_lock ; ptr ; ptr= ptr->next_global) { - const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table); + rgi->m_table_map.set_table(ptr->table_id, ptr->table); } #ifdef HAVE_QUERY_CACHE - query_cache.invalidate_locked_for_write(thd, rli->tables_to_lock); + query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock); #endif } TABLE* table= - m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id); + m_table= rgi->m_table_map.get_table(m_table_id); if (table) { @@ -1657,7 +1657,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) if (!table->in_use) table->in_use= thd; - error= do_exec_row(rli); + error= do_exec_row(rgi); DBUG_PRINT("info", ("error: %d", error)); DBUG_ASSERT(error != HA_ERR_RECORD_DELETED); @@ -1696,7 +1696,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong) m_rows_end)); if (!m_curr_row_end && !error) - unpack_current_row(rli); + unpack_current_row(rgi); // at this moment m_curr_row_end should be set DBUG_ASSERT(error || m_curr_row_end != NULL); @@ -1733,7 +1733,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) rollback at the caller along with sbr. */ thd->reset_current_stmt_binlog_format_row(); - const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error); + rgi->cleanup_context(thd, error); thd->is_slave_error= 1; DBUG_RETURN(error); } @@ -1812,7 +1812,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) */ thd->reset_current_stmt_binlog_format_row(); - const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0); + rgi->cleanup_context(thd, 0); } DBUG_RETURN(error); @@ -1998,8 +1998,7 @@ void Old_rows_log_event::print_helper(FILE *file, */ int -Old_rows_log_event::write_row(const Relay_log_info *const rli, - const bool overwrite) +Old_rows_log_event::write_row(rpl_group_info *rgi, const bool overwrite) { DBUG_ENTER("write_row"); DBUG_ASSERT(m_table != NULL && thd != NULL); @@ -2016,7 +2015,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli, DBUG_RETURN(error); /* unpack row into table->record[0] */ - error= unpack_current_row(rli); // TODO: how to handle errors? + error= unpack_current_row(rgi); // TODO: how to handle errors? #ifndef DBUG_OFF DBUG_DUMP("record[0]", table->record[0], table->s->reclength); @@ -2123,7 +2122,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli, if (!get_flags(COMPLETE_ROWS_F)) { restore_record(table,record[1]); - error= unpack_current_row(rli); + error= unpack_current_row(rgi); } #ifndef DBUG_OFF @@ -2218,7 +2217,7 @@ Old_rows_log_event::write_row(const Relay_log_info *const rli, for any following update/delete command. */ -int Old_rows_log_event::find_row(const Relay_log_info *rli) +int Old_rows_log_event::find_row(rpl_group_info *rgi) { DBUG_ENTER("find_row"); @@ -2231,7 +2230,7 @@ int Old_rows_log_event::find_row(const Relay_log_info *rli) // TODO: shall we check and report errors here? prepare_record(table, m_width, FALSE /* don't check errors */); - error= unpack_current_row(rli); + error= unpack_current_row(rgi); #ifndef DBUG_OFF DBUG_PRINT("info",("looking for the following record")); @@ -2603,10 +2602,10 @@ Write_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabili int -Write_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) +Write_rows_log_event_old::do_exec_row(rpl_group_info *rgi) { DBUG_ASSERT(m_table != NULL); - int error= write_row(rli, TRUE /* overwrite */); + int error= write_row(rgi, TRUE /* overwrite */); if (error && !thd->net.last_errno) thd->net.last_errno= error; @@ -2705,12 +2704,12 @@ Delete_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabil } -int Delete_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) +int Delete_rows_log_event_old::do_exec_row(rpl_group_info *rgi) { int error; DBUG_ASSERT(m_table != NULL); - if (!(error= find_row(rli))) + if (!(error= find_row(rgi))) { /* Delete the record found, located in record[0] @@ -2804,11 +2803,11 @@ Update_rows_log_event_old::do_after_row_operations(const Slave_reporting_capabil int -Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) +Update_rows_log_event_old::do_exec_row(rpl_group_info *rgi) { DBUG_ASSERT(m_table != NULL); - int error= find_row(rli); + int error= find_row(rgi); if (error) { /* @@ -2816,7 +2815,7 @@ Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) able to skip to the next pair of updates */ m_curr_row= m_curr_row_end; - unpack_current_row(rli); + unpack_current_row(rgi); return error; } @@ -2834,7 +2833,7 @@ Update_rows_log_event_old::do_exec_row(const Relay_log_info *const rli) store_record(m_table,record[1]); m_curr_row= m_curr_row_end; - error= unpack_current_row(rli); // this also updates m_curr_row_end + error= unpack_current_row(rgi); // this also updates m_curr_row_end /* Now we have the right row to update. The old row (the one we're diff --git a/sql/log_event_old.h b/sql/log_event_old.h index 7c35b875dc4..01b80439fa1 100644 --- a/sql/log_event_old.h +++ b/sql/log_event_old.h @@ -195,15 +195,15 @@ protected: const uchar *m_curr_row_end; /* One-after the end of the current row */ uchar *m_key; /* Buffer to keep key value during searches */ - int find_row(const Relay_log_info *const); - int write_row(const Relay_log_info *const, const bool); + int find_row(rpl_group_info *); + int write_row(rpl_group_info *, const bool); // Unpack the current row into m_table->record[0] - int unpack_current_row(const Relay_log_info *const rli) + int unpack_current_row(rpl_group_info *rgi) { DBUG_ASSERT(m_table); ASSERT_OR_RETURN_ERROR(m_curr_row < m_rows_end, HA_ERR_CORRUPT_EVENT); - int const result= ::unpack_row(rli, m_table, m_width, m_curr_row, + int const result= ::unpack_row(rgi, m_table, m_width, m_curr_row, m_rows_end, &m_cols, &m_curr_row_end, &m_master_reclength); ASSERT_OR_RETURN_ERROR(m_curr_row_end <= m_rows_end, HA_ERR_CORRUPT_EVENT); @@ -267,7 +267,7 @@ private: 0 if execution succeeded, 1 if execution failed. */ - virtual int do_exec_row(const Relay_log_info *const rli) = 0; + virtual int do_exec_row(rpl_group_info *rgi) = 0; #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ /********** END OF CUT & PASTE FROM Rows_log_event **********/ @@ -324,7 +324,7 @@ private: RETURN VALUE Error code, if something went wrong, 0 otherwise. */ - virtual int do_prepare_row(THD*, Relay_log_info const*, TABLE*, + virtual int do_prepare_row(THD*, rpl_group_info*, TABLE*, uchar const *row_start, uchar const **row_end) = 0; @@ -387,7 +387,7 @@ private: #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) virtual int do_before_row_operations(const Slave_reporting_capability *const); virtual int do_after_row_operations(const Slave_reporting_capability *const,int); - virtual int do_exec_row(const Relay_log_info *const); + virtual int do_exec_row(rpl_group_info *); #endif /********** END OF CUT & PASTE FROM Write_rows_log_event **********/ @@ -409,7 +409,7 @@ private: // primitives for old version of do_apply_event() virtual int do_before_row_operations(TABLE *table); virtual int do_after_row_operations(TABLE *table, int error); - virtual int do_prepare_row(THD*, Relay_log_info const*, TABLE*, + virtual int do_prepare_row(THD*, rpl_group_info*, TABLE*, uchar const *row_start, uchar const **row_end); virtual int do_exec_row(TABLE *table); @@ -463,7 +463,7 @@ protected: #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) virtual int do_before_row_operations(const Slave_reporting_capability *const); virtual int do_after_row_operations(const Slave_reporting_capability *const,int); - virtual int do_exec_row(const Relay_log_info *const); + virtual int do_exec_row(rpl_group_info *); #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ /********** END OF CUT & PASTE FROM Update_rows_log_event **********/ @@ -487,7 +487,7 @@ private: // primitives for old version of do_apply_event() virtual int do_before_row_operations(TABLE *table); virtual int do_after_row_operations(TABLE *table, int error); - virtual int do_prepare_row(THD*, Relay_log_info const*, TABLE*, + virtual int do_prepare_row(THD*, rpl_group_info*, TABLE*, uchar const *row_start, uchar const **row_end); virtual int do_exec_row(TABLE *table); #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ @@ -538,7 +538,7 @@ protected: #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) virtual int do_before_row_operations(const Slave_reporting_capability *const); virtual int do_after_row_operations(const Slave_reporting_capability *const,int); - virtual int do_exec_row(const Relay_log_info *const); + virtual int do_exec_row(rpl_group_info *); #endif /********** END CUT & PASTE FROM Delete_rows_log_event **********/ @@ -562,7 +562,7 @@ private: // primitives for old version of do_apply_event() virtual int do_before_row_operations(TABLE *table); virtual int do_after_row_operations(TABLE *table, int error); - virtual int do_prepare_row(THD*, Relay_log_info const*, TABLE*, + virtual int do_prepare_row(THD*, rpl_group_info*, TABLE*, uchar const *row_start, uchar const **row_end); virtual int do_exec_row(TABLE *table); #endif diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 7cf2c9162ff..b4c1f6c941a 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -72,6 +72,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, /* ToDo: Access to thd, and what about rli, split out a parallel part? */ mysql_mutex_lock(&rli->data_lock); err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); + thd->rgi_slave= NULL; /* ToDo: error handling. */ } @@ -487,12 +488,22 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry) } +static void +free_rpl_parallel_entry(void *element) +{ + rpl_parallel_entry *e= (rpl_parallel_entry *)element; + mysql_cond_destroy(&e->COND_parallel_entry); + mysql_mutex_destroy(&e->LOCK_parallel_entry); + my_free(e); +} + + rpl_parallel::rpl_parallel() : current(NULL) { my_hash_init(&domain_hash, &my_charset_bin, 32, offsetof(rpl_parallel_entry, domain_id), sizeof(uint32), - NULL, NULL, HASH_UNIQUE); + NULL, free_rpl_parallel_entry, HASH_UNIQUE); } @@ -667,6 +678,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) qev->rgi= serial_rgi; rpt_handle_event(qev, NULL); delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev); + my_free(qev); return false; } diff --git a/sql/rpl_record.cc b/sql/rpl_record.cc index 99bf8a82004..12df72a251b 100644 --- a/sql/rpl_record.cc +++ b/sql/rpl_record.cc @@ -186,7 +186,7 @@ pack_row(TABLE *table, MY_BITMAP const* cols, */ #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) int -unpack_row(Relay_log_info const *rli, +unpack_row(rpl_group_info *rgi, TABLE *table, uint const colcnt, uchar const *const row_data, uchar const *const row_buffer_end, MY_BITMAP const *cols, @@ -214,18 +214,18 @@ unpack_row(Relay_log_info const *rli, uint i= 0; table_def *tabledef= NULL; TABLE *conv_table= NULL; - bool table_found= rli && rli->get_table_data(table, &tabledef, &conv_table); + bool table_found= rgi && rgi->get_table_data(table, &tabledef, &conv_table); DBUG_PRINT("debug", ("Table data: table_found: %d, tabldef: %p, conv_table: %p", table_found, tabledef, conv_table)); DBUG_ASSERT(table_found); /* - If rli is NULL it means that there is no source table and that the + If rgi is NULL it means that there is no source table and that the row shall just be unpacked without doing any checks. This feature is used by MySQL Backup, but can be used for other purposes as well. */ - if (rli && !table_found) + if (rgi && !table_found) DBUG_RETURN(HA_ERR_GENERIC); for (field_ptr= begin_ptr ; field_ptr < end_ptr && *field_ptr ; ++field_ptr) @@ -313,7 +313,7 @@ unpack_row(Relay_log_info const *rli, (int) (pack_ptr - old_pack_ptr))); if (!pack_ptr) { - rli->report(ERROR_LEVEL, ER_SLAVE_CORRUPT_EVENT, + rgi->rli->report(ERROR_LEVEL, ER_SLAVE_CORRUPT_EVENT, "Could not read field '%s' of table '%s.%s'", f->field_name, table->s->db.str, table->s->table_name.str); diff --git a/sql/rpl_record.h b/sql/rpl_record.h index 4b34dcd0a96..7369edf1379 100644 --- a/sql/rpl_record.h +++ b/sql/rpl_record.h @@ -21,7 +21,7 @@ #include <rpl_reporting.h> #include "my_global.h" /* uchar */ -class Relay_log_info; +class rpl_group_info; struct TABLE; typedef struct st_bitmap MY_BITMAP; @@ -31,7 +31,7 @@ size_t pack_row(TABLE* table, MY_BITMAP const* cols, #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -int unpack_row(Relay_log_info const *rli, +int unpack_row(rpl_group_info *rgi, TABLE *table, uint const colcnt, uchar const *const row_data, uchar const *row_buffer_end, MY_BITMAP const *cols, diff --git a/sql/rpl_record_old.cc b/sql/rpl_record_old.cc index fa0c49b413c..5afa529a63c 100644 --- a/sql/rpl_record_old.cc +++ b/sql/rpl_record_old.cc @@ -88,7 +88,7 @@ pack_row_old(TABLE *table, MY_BITMAP const* cols, */ #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) int -unpack_row_old(Relay_log_info *rli, +unpack_row_old(rpl_group_info *rgi, TABLE *table, uint const colcnt, uchar *record, uchar const *row, const uchar *row_buffer_end, MY_BITMAP const *cols, @@ -141,7 +141,7 @@ unpack_row_old(Relay_log_info *rli, f->move_field_offset(-offset); if (!ptr) { - rli->report(ERROR_LEVEL, ER_SLAVE_CORRUPT_EVENT, + rgi->rli->report(ERROR_LEVEL, ER_SLAVE_CORRUPT_EVENT, "Could not read field `%s` of table `%s`.`%s`", f->field_name, table->s->db.str, table->s->table_name.str); @@ -183,7 +183,7 @@ unpack_row_old(Relay_log_info *rli, if (event_type == WRITE_ROWS_EVENT && ((*field_ptr)->flags & mask) == mask) { - rli->report(ERROR_LEVEL, ER_NO_DEFAULT_FOR_FIELD, + rgi->rli->report(ERROR_LEVEL, ER_NO_DEFAULT_FOR_FIELD, "Field `%s` of table `%s`.`%s` " "has no default value and cannot be NULL", (*field_ptr)->field_name, table->s->db.str, diff --git a/sql/rpl_record_old.h b/sql/rpl_record_old.h index ea981fb23c3..34ef9f11c47 100644 --- a/sql/rpl_record_old.h +++ b/sql/rpl_record_old.h @@ -23,7 +23,7 @@ size_t pack_row_old(TABLE *table, MY_BITMAP const* cols, uchar *row_data, const uchar *record); #ifdef HAVE_REPLICATION -int unpack_row_old(Relay_log_info *rli, +int unpack_row_old(rpl_group_info *rgi, TABLE *table, uint const colcnt, uchar *record, uchar const *row, uchar const *row_buffer_end, MY_BITMAP const *cols, diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 49547718230..c4b898f74e3 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -59,7 +59,6 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) abort_pos_wait(0), slave_run_id(0), sql_thd(0), inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), executed_entries(0), - tables_to_lock(0), tables_to_lock_count(0), last_event_start_time(0), m_flags(0), row_stmt_start_timestamp(0), long_find_row_note_printed(false) { @@ -135,8 +134,6 @@ int init_relay_log_info(Relay_log_info* rli, rli->abort_pos_wait=0; rli->log_space_limit= relay_log_space_limit; rli->log_space_total= 0; - rli->tables_to_lock= 0; - rli->tables_to_lock_count= 0; char pattern[FN_REFLEN]; (void) my_realpath(pattern, slave_load_tmpdir, 0); @@ -1261,129 +1258,6 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, } #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -void Relay_log_info::cleanup_context(THD *thd, bool error) -{ - DBUG_ENTER("Relay_log_info::cleanup_context"); - - /* - In parallel replication, different THDs can be used from different - parallel threads. But in single-threaded mode, only the THD of the main - SQL thread is allowed. - */ - DBUG_ASSERT(opt_slave_parallel_threads > 0 || sql_thd == thd); - /* - 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them, - may have opened tables, which we cannot be sure have been closed (because - maybe the Rows_log_event have not been found or will not be, because slave - SQL thread is stopping, or relay log has a missing tail etc). So we close - all thread's tables. And so the table mappings have to be cancelled. - 2) Rows_log_event::do_apply_event() may even have started statements or - transactions on them, which we need to rollback in case of error. - 3) If finding a Format_description_log_event after a BEGIN, we also need - to rollback before continuing with the next events. - 4) so we need this "context cleanup" function. - */ - if (error) - { - trans_rollback_stmt(thd); // if a "statement transaction" - trans_rollback(thd); // if a "real transaction" - } - m_table_map.clear_tables(); - slave_close_thread_tables(thd); - if (error) - thd->mdl_context.release_transactional_locks(); - clear_flag(IN_STMT); - /* - Cleanup for the flags that have been set at do_apply_event. - */ - thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS; - thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS; - - /* - Reset state related to long_find_row notes in the error log: - - timestamp - - flag that decides whether the slave prints or not - */ - reset_row_stmt_start_timestamp(); - unset_long_find_row_note_printed(); - - DBUG_VOID_RETURN; -} - -void Relay_log_info::clear_tables_to_lock() -{ - DBUG_ENTER("Relay_log_info::clear_tables_to_lock()"); -#ifndef DBUG_OFF - /** - When replicating in RBR and MyISAM Merge tables are involved - open_and_lock_tables (called in do_apply_event) appends the - base tables to the list of tables_to_lock. Then these are - removed from the list in close_thread_tables (which is called - before we reach this point). - - This assertion just confirms that we get no surprises at this - point. - */ - uint i=0; - for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ; - DBUG_ASSERT(i == tables_to_lock_count); -#endif - - while (tables_to_lock) - { - uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock); - if (tables_to_lock->m_tabledef_valid) - { - tables_to_lock->m_tabledef.table_def::~table_def(); - tables_to_lock->m_tabledef_valid= FALSE; - } - - /* - If blob fields were used during conversion of field values - from the master table into the slave table, then we need to - free the memory used temporarily to store their values before - copying into the slave's table. - */ - if (tables_to_lock->m_conv_table) - free_blobs(tables_to_lock->m_conv_table); - - tables_to_lock= - static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global); - tables_to_lock_count--; - my_free(to_free); - } - DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0); - DBUG_VOID_RETURN; -} - -void Relay_log_info::slave_close_thread_tables(THD *thd) -{ - DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)"); - thd->stmt_da->can_overwrite_status= TRUE; - thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd); - thd->stmt_da->can_overwrite_status= FALSE; - - close_thread_tables(thd); - /* - - If inside a multi-statement transaction, - defer the release of metadata locks until the current - transaction is either committed or rolled back. This prevents - other statements from modifying the table for the entire - duration of this transaction. This provides commit ordering - and guarantees serializability across multiple transactions. - - If in autocommit mode, or outside a transactional context, - automatically release metadata locks of the current statement. - */ - if (! thd->in_multi_stmt_transaction_mode()) - thd->mdl_context.release_transactional_locks(); - else - thd->mdl_context.release_statement_locks(); - - clear_tables_to_lock(); - DBUG_VOID_RETURN; -} - - int rpl_load_gtid_slave_state(THD *thd) { @@ -1539,7 +1413,8 @@ end: rpl_group_info::rpl_group_info(Relay_log_info *rli_) : rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0), wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0), - deferred_events(NULL), m_annotate_event(0) + deferred_events(NULL), m_annotate_event(0), tables_to_lock(0), + tables_to_lock_count(0) { bzero(¤t_gtid, sizeof(current_gtid)); } @@ -1613,4 +1488,126 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi, } } + +void rpl_group_info::cleanup_context(THD *thd, bool error) +{ + DBUG_ENTER("Relay_log_info::cleanup_context"); + + DBUG_ASSERT(this->thd == thd); + /* + 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them, + may have opened tables, which we cannot be sure have been closed (because + maybe the Rows_log_event have not been found or will not be, because slave + SQL thread is stopping, or relay log has a missing tail etc). So we close + all thread's tables. And so the table mappings have to be cancelled. + 2) Rows_log_event::do_apply_event() may even have started statements or + transactions on them, which we need to rollback in case of error. + 3) If finding a Format_description_log_event after a BEGIN, we also need + to rollback before continuing with the next events. + 4) so we need this "context cleanup" function. + */ + if (error) + { + trans_rollback_stmt(thd); // if a "statement transaction" + trans_rollback(thd); // if a "real transaction" + } + m_table_map.clear_tables(); + slave_close_thread_tables(thd); + if (error) + thd->mdl_context.release_transactional_locks(); + /* ToDo: This must clear the flag in rgi, not rli. */ + rli->clear_flag(Relay_log_info::IN_STMT); + /* + Cleanup for the flags that have been set at do_apply_event. + */ + thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS; + thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS; + + /* + Reset state related to long_find_row notes in the error log: + - timestamp + - flag that decides whether the slave prints or not + */ + rli->reset_row_stmt_start_timestamp(); + rli->unset_long_find_row_note_printed(); + + DBUG_VOID_RETURN; +} + + +void rpl_group_info::clear_tables_to_lock() +{ + DBUG_ENTER("Relay_log_info::clear_tables_to_lock()"); +#ifndef DBUG_OFF + /** + When replicating in RBR and MyISAM Merge tables are involved + open_and_lock_tables (called in do_apply_event) appends the + base tables to the list of tables_to_lock. Then these are + removed from the list in close_thread_tables (which is called + before we reach this point). + + This assertion just confirms that we get no surprises at this + point. + */ + uint i=0; + for (TABLE_LIST *ptr= tables_to_lock ; ptr ; ptr= ptr->next_global, i++) ; + DBUG_ASSERT(i == tables_to_lock_count); +#endif + + while (tables_to_lock) + { + uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock); + if (tables_to_lock->m_tabledef_valid) + { + tables_to_lock->m_tabledef.table_def::~table_def(); + tables_to_lock->m_tabledef_valid= FALSE; + } + + /* + If blob fields were used during conversion of field values + from the master table into the slave table, then we need to + free the memory used temporarily to store their values before + copying into the slave's table. + */ + if (tables_to_lock->m_conv_table) + free_blobs(tables_to_lock->m_conv_table); + + tables_to_lock= + static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global); + tables_to_lock_count--; + my_free(to_free); + } + DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0); + DBUG_VOID_RETURN; +} + + +void rpl_group_info::slave_close_thread_tables(THD *thd) +{ + DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)"); + thd->stmt_da->can_overwrite_status= TRUE; + thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd); + thd->stmt_da->can_overwrite_status= FALSE; + + close_thread_tables(thd); + /* + - If inside a multi-statement transaction, + defer the release of metadata locks until the current + transaction is either committed or rolled back. This prevents + other statements from modifying the table for the entire + duration of this transaction. This provides commit ordering + and guarantees serializability across multiple transactions. + - If in autocommit mode, or outside a transactional context, + automatically release metadata locks of the current statement. + */ + if (! thd->in_multi_stmt_transaction_mode()) + thd->mdl_context.release_transactional_locks(); + else + thd->mdl_context.release_statement_locks(); + + clear_tables_to_lock(); + DBUG_VOID_RETURN; +} + + #endif diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 4d954d1c8aa..10181cc6fab 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -361,27 +361,6 @@ public: group_relay_log_pos); } - RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */ - uint tables_to_lock_count; /* RBR: Count of tables to lock */ - table_mapping m_table_map; /* RBR: Mapping table-id to table */ - - bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const - { - DBUG_ASSERT(tabledef_var && conv_table_var); - for (TABLE_LIST *ptr= tables_to_lock ; ptr != NULL ; ptr= ptr->next_global) - if (ptr->table == table_arg) - { - *tabledef_var= &static_cast<RPL_TABLE_LIST*>(ptr)->m_tabledef; - *conv_table_var= static_cast<RPL_TABLE_LIST*>(ptr)->m_conv_table; - DBUG_PRINT("debug", ("Fetching table data for table %s.%s:" - " tabledef: %p, conv_table: %p", - table_arg->s->db.str, table_arg->s->table_name.str, - *tabledef_var, *conv_table_var)); - return true; - } - return false; - } - /* Last charset (6 bytes) seen by slave SQL thread is cached here; it helps the thread save 3 get_charset() per Query_log_event if the charset is not @@ -391,10 +370,6 @@ public: void cached_charset_invalidate(); bool cached_charset_compare(char *charset) const; - void cleanup_context(THD *, bool); - void slave_close_thread_tables(THD *); - void clear_tables_to_lock(); - /* Used to defer stopping the SQL thread to give it a chance to finish up the current group of events. @@ -588,6 +563,10 @@ struct rpl_group_info Annotate_rows_log_event *m_annotate_event; + RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */ + uint tables_to_lock_count; /* RBR: Count of tables to lock */ + table_mapping m_table_map; /* RBR: Mapping table-id to table */ + rpl_group_info(Relay_log_info *rli_); ~rpl_group_info(); @@ -649,6 +628,26 @@ struct rpl_group_info } } + bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const + { + DBUG_ASSERT(tabledef_var && conv_table_var); + for (TABLE_LIST *ptr= tables_to_lock ; ptr != NULL ; ptr= ptr->next_global) + if (ptr->table == table_arg) + { + *tabledef_var= &static_cast<RPL_TABLE_LIST*>(ptr)->m_tabledef; + *conv_table_var= static_cast<RPL_TABLE_LIST*>(ptr)->m_conv_table; + DBUG_PRINT("debug", ("Fetching table data for table %s.%s:" + " tabledef: %p, conv_table: %p", + table_arg->s->db.str, table_arg->s->table_name.str, + *tabledef_var, *conv_table_var)); + return true; + } + return false; + } + + void clear_tables_to_lock(); + void cleanup_context(THD *, bool); + void slave_close_thread_tables(THD *); }; diff --git a/sql/slave.cc b/sql/slave.cc index e0cc595213d..c807561b0b3 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3307,7 +3307,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, else { exec_res= 0; - rli->cleanup_context(thd, 1); + serial_rgi->cleanup_context(thd, 1); /* chance for concurrent connection to get more locks */ slave_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE), sql_slave_killed, rli); @@ -3983,7 +3983,7 @@ pthread_handler_t handle_slave_sql(void *arg) Master_info *mi= ((Master_info*)arg); Relay_log_info* rli = &mi->rli; const char *errmsg; - rpl_group_info serial_rgi(rli); + rpl_group_info *serial_rgi; // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff my_thread_init(); @@ -3992,10 +3992,11 @@ pthread_handler_t handle_slave_sql(void *arg) LINT_INIT(saved_master_log_pos); LINT_INIT(saved_log_pos); + serial_rgi= new rpl_group_info(rli); thd = new THD; // note that contructor of THD uses DBUG_ ! thd->thread_stack = (char*)&thd; // remember where our stack is thd->rpl_filter = mi->rpl_filter; - serial_rgi.thd= thd; + serial_rgi->thd= thd; DBUG_ASSERT(rli->inited); DBUG_ASSERT(rli->mi == mi); @@ -4025,10 +4026,10 @@ pthread_handler_t handle_slave_sql(void *arg) goto err_during_init; } thd->init_for_queries(); - thd->rgi_slave= &serial_rgi; - if ((serial_rgi.deferred_events_collecting= mi->rpl_filter->is_on())) + thd->rgi_slave= serial_rgi; + if ((serial_rgi->deferred_events_collecting= mi->rpl_filter->is_on())) { - serial_rgi.deferred_events= new Deferred_log_events(rli); + serial_rgi->deferred_events= new Deferred_log_events(rli); } thd->temporary_tables = rli->save_temporary_tables; // restore temp tables @@ -4211,7 +4212,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, saved_skip= 0; } - if (exec_relay_log_event(thd, rli, &serial_rgi)) + if (exec_relay_log_event(thd, rli, serial_rgi)) { DBUG_PRINT("info", ("exec_relay_log_event() failed")); // do not scare the user if SQL thread was simply killed or stopped @@ -4338,7 +4339,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ must "proactively" clear playgrounds: */ thd->clear_error(); - rli->cleanup_context(thd, 1); + serial_rgi->cleanup_context(thd, 1); /* Some extra safety, which should not been needed (normally, event deletion should already have done these assignments (each event which sets these @@ -4379,6 +4380,7 @@ err_during_init: mysql_mutex_lock(&LOCK_thread_count); THD_CHECK_SENTRY(thd); delete thd; + delete serial_rgi; mysql_mutex_unlock(&LOCK_thread_count); /* Note: the order of the broadcast and unlock calls below (first broadcast, then unlock) diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 04cb4adcb2c..4f41b942345 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -273,7 +273,7 @@ void mysql_client_binlog_statement(THD* thd) end: thd->variables.option_bits= thd_options; - rli->slave_close_thread_tables(thd); + rgi->slave_close_thread_tables(thd); my_free(buf); DBUG_VOID_RETURN; } |