diff options
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_parallel_retry.result | 1 | ||||
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_perfschema_applier_status_by_worker.result | 42 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel_retry.test | 6 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_perfschema_applier_status_by_worker.test | 96 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 54 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 35 | ||||
-rw-r--r-- | sql/rpl_reporting.cc | 2 | ||||
-rw-r--r-- | storage/perfschema/table_replication_applier_status_by_worker.cc | 26 |
8 files changed, 260 insertions, 2 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel_retry.result b/mysql-test/suite/rpl/r/rpl_parallel_retry.result index 29657a13eb7..2cc4044a2cd 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel_retry.result +++ b/mysql-test/suite/rpl/r/rpl_parallel_retry.result @@ -127,6 +127,7 @@ include/wait_for_slave_sql_error.inc [errno=1213] SET GLOBAL debug_dbug=@old_dbug; retries 10 +include/assert.inc [Performance Schema retries should match with actual retries] SELECT * FROM t1 ORDER BY a; a b 1 3 diff --git a/mysql-test/suite/rpl/r/rpl_perfschema_applier_status_by_worker.result b/mysql-test/suite/rpl/r/rpl_perfschema_applier_status_by_worker.result index 85f7d52dff7..3545dc606ac 100644 --- a/mysql-test/suite/rpl/r/rpl_perfschema_applier_status_by_worker.result +++ b/mysql-test/suite/rpl/r/rpl_perfschema_applier_status_by_worker.result @@ -38,6 +38,48 @@ include/wait_for_slave_to_stop.inc RESET SLAVE ALL; SET default_master_connection=''; CHANGE MASTER TO MASTER_USER='root', MASTER_HOST='127.0.0.1',MASTER_PORT=$MASTER_MYPORT; +include/start_slave.inc + +# Introduce an error in the worker thread and check for the correctness +# of error number, message and timestamp fields. + +connection master; +use test; +create table t(a int primary key); +connection slave; +drop table t; +connection master; +insert into t values(1); +connection slave; +include/wait_for_slave_sql_error.inc [errno=1146] + +# Extract the error related fields from SSS and PS table and compare +# them for correctness. + +include/assert.inc [Value returned by SSS and PS table for Last_Error_Number should be same.] +Last_Error_Message +Error 'Table 'test.t' doesn't exist' on query. Default database: 'test'. Query: 'insert into t values(1)' + +# Verify that the error fields are preserved after STOP SLAVE. + + +# 1. Verify that thread_id changes to NULL and service_state to "off" on +# STOP SLAVE. + +include/assert.inc [After STOP SLAVE, thread_id should be NULL] +include/assert.inc [So, Service_State after STOP SLAVE should be "OFF".] + +# 2. Extract the worker_id and the error related fields from SSS and PS +# table and compare them. These fields should preserve their values. + +include/assert.inc [Value returned by SSS and PS table for Last_Error_Number should be same.] +Last_Error_Message +Error 'Table 'test.t' doesn't exist' on query. Default database: 'test'. Query: 'insert into t values(1)' +include/stop_slave.inc +RESET SLAVE; +connection master; +DROP TABLE t; +RESET MASTER; # Verify that number of rows in 'replication_applier_status_by_worker' table match with # number of slave_parallel_workers. diff --git a/mysql-test/suite/rpl/t/rpl_parallel_retry.test b/mysql-test/suite/rpl/t/rpl_parallel_retry.test index 048c6e1df9b..8ea48a2f176 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel_retry.test +++ b/mysql-test/suite/rpl/t/rpl_parallel_retry.test @@ -149,6 +149,12 @@ let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', V --disable_query_log eval SELECT $new_retry - $old_retry AS retries; --enable_query_log +let $ps_value= query_get_value(select last_trans_retry_count from + performance_schema.replication_applier_status_by_worker where + last_trans_retry_count > 0, last_trans_retry_count, 1); +let $assert_text= Performance Schema retries should match with actual retries; +let $assert_cond= "$ps_value" = $new_retry - $old_retry; +source include/assert.inc; SELECT * FROM t1 ORDER BY a; STOP SLAVE IO_THREAD; diff --git a/mysql-test/suite/rpl/t/rpl_perfschema_applier_status_by_worker.test b/mysql-test/suite/rpl/t/rpl_perfschema_applier_status_by_worker.test index b59ab826bcc..6ded461db30 100644 --- a/mysql-test/suite/rpl/t/rpl_perfschema_applier_status_by_worker.test +++ b/mysql-test/suite/rpl/t/rpl_perfschema_applier_status_by_worker.test @@ -133,6 +133,102 @@ STOP SLAVE 'slave1'; RESET SLAVE ALL; SET default_master_connection=''; evalp CHANGE MASTER TO MASTER_USER='root', MASTER_HOST='127.0.0.1',MASTER_PORT=$MASTER_MYPORT; +--source include/start_slave.inc + +--echo +--echo # Introduce an error in the worker thread and check for the correctness +--echo # of error number, message and timestamp fields. +--echo + +# Cause an error in Worker thread. +# 1) Create a table 't' at master, replicate at slave. +# 2) Drop table 't' at slave only. +# 3) Insert a value in table 't' on master and replicate on slave. +# Since slave doesnt have table 't' anymore, worker thread will report an error. + +--connection master +use test; +create table t(a int primary key); +sync_slave_with_master; +drop table t; +--connection master +insert into t values(1); +--connection slave +let $slave_sql_errno=1146; +source include/wait_for_slave_sql_error.inc; + +--echo +--echo # Extract the error related fields from SSS and PS table and compare +--echo # them for correctness. +--echo + +let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1); +let $ps_value= query_get_value(select Last_Error_Number from performance_schema.replication_applier_status_by_worker, Last_Error_Number, 1); +let $assert_text= Value returned by SSS and PS table for Last_Error_Number should be same.; +let $assert_cond= "$sss_value" = "$ps_value"; +source include/assert.inc; + +--disable_query_log +--replace_regex /master-bin.[0-9]+/FILENAME/ /end_log_pos [0-9]+/end_log_pos POSITION/ +select Last_Error_Message from performance_schema.replication_applier_status_by_worker; +--enable_query_log + +--echo +--echo # Verify that the error fields are preserved after STOP SLAVE. +--echo + +--echo +--echo # 1. Verify that thread_id changes to NULL and service_state to "off" on +--echo # STOP SLAVE. +--echo + +let $ps_value= query_get_value(select thread_id from performance_schema.replication_applier_status_by_worker, thread_id, 1); +let $assert_text= After STOP SLAVE, thread_id should be NULL; +let $assert_cond= "$ps_value" = "NULL"; +source include/assert.inc; + +let $ps_value= query_get_value(select service_state from performance_schema.replication_applier_status_by_worker, service_state, 1); +let $assert_text= So, Service_State after STOP SLAVE should be "OFF".; +let $assert_cond= "$ps_value"= "OFF"; +source include/assert.inc; + +--echo +--echo # 2. Extract the worker_id and the error related fields from SSS and PS +--echo # table and compare them. These fields should preserve their values. +--echo + +let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1); +let $ps_value= query_get_value(select Last_Error_Number from performance_schema.replication_applier_status_by_worker, Last_Error_Number, 1); +let $assert_text= Value returned by SSS and PS table for Last_Error_Number should be same.; +let $assert_cond= "$sss_value" = "$ps_value"; +source include/assert.inc; + +--disable_query_log +--replace_regex /master-bin.[0-9]+/FILENAME/ /end_log_pos [0-9]+/end_log_pos POSITION/ +select Last_Error_Message from performance_schema.replication_applier_status_by_worker; +--enable_query_log + +# The timestamp format is slightly different in SSS and PS. +# SSS => YYMMDD HH:MM:SS +# PS => YYYY-MM-DD HH:MM:SS +# To match the two, we get rid of hyphons from PS output and first two digits +# the year field so that it can be matched directly. + +#--- TODO: Can we include Last_SQL_Error_Timestamp as part of SSS + +#let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Error_Timestamp, 1); +#let $ps_value= query_get_value(select Last_Error_Timestamp from performance_schema.replication_applier_status_by_worker, Last_Error_Timestamp, 1); +#let $ps_value_without_hyphons= `SELECT REPLACE("$ps_value", '-', '')`; +#let $ps_value_in_sss_format= `select substring("$ps_value_without_hyphons", 3)`; +#let $assert_text= Value returned by SSS and PS table for Last_Error_Timestamp should be same.; +#let $assert_cond= "$sss_value" = "$ps_value_in_sss_format"; +#source include/assert.inc; + +--source include/stop_slave.inc +RESET SLAVE; +--connection master +DROP TABLE t; +RESET MASTER; --echo --echo # Verify that number of rows in 'replication_applier_status_by_worker' table match with diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 44fba4e30e7..8be1964b762 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -1752,6 +1752,7 @@ int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool) { int rc= 0; + struct pool_bkp_for_pfs* bkp= &pool->pfs_bkp; if ((rc= pool_mark_busy(pool, current_thd))) return rc; // killed @@ -1761,6 +1762,23 @@ rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool) pool_mark_not_busy(pool); rc= rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads, 0); + if (!rc) + { + if (pool->count) + { + if (bkp->inited) + { + if (bkp->count != pool->count) + { + bkp->destroy(); + bkp->init(pool->count); + } + } + else + bkp->init(pool->count); + } + } + } else { @@ -2026,7 +2044,8 @@ rpl_parallel_thread::rpl_parallel_thread() rpl_parallel_thread_pool::rpl_parallel_thread_pool() - : threads(0), free_list(0), count(0), inited(false), busy(false) + : threads(0), free_list(0), count(0), inited(false), busy(false), + pfs_bkp{0, false, NULL} { } @@ -2057,6 +2076,7 @@ void rpl_parallel_thread_pool::destroy() { deactivate(); + pfs_bkp.destroy(); destroy_cond_mutex(); } @@ -2125,6 +2145,37 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt) mysql_mutex_unlock(&LOCK_rpl_thread_pool); } +void +rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli) +{ + if (pfs_bkp.inited) + { + for(uint i=0; i<count;i++) + { + rpl_parallel_thread *rpt, *pfs_rpt; + rpt= threads[i]; + pfs_rpt= pfs_bkp.rpl_thread_arr[i]; + if (rpt->channel_name_length) + { + pfs_rpt->channel_name_length= rpt->channel_name_length; + strmake(pfs_rpt->channel_name, rpt->channel_name, + rpt->channel_name_length); + } + pfs_rpt->thd= rpt->thd; + pfs_rpt->last_seen_gtid= rpt->last_seen_gtid; + if (rli->err_thread_id && rpt->thd->thread_id == rli->err_thread_id) + { + pfs_rpt->last_error_number= rli->last_error().number; + strmake(pfs_rpt->last_error_message, + rli->last_error().message, sizeof(rli->last_error().message)); + pfs_rpt->last_error_timestamp= rli->last_error().skr*1000000; + } + pfs_rpt->running= false; + pfs_rpt->worker_idle_time= rpt->get_worker_idle_time(); + pfs_rpt->last_trans_retry_count= rpt->last_trans_retry_count; + } + } +} /* Obtain a worker thread that we can queue an event to. @@ -2393,6 +2444,7 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) STRING_WITH_LEN("now SIGNAL wait_for_done_waiting")); };); + global_rpl_thread_pool.copy_pool_for_pfs(rli); for (i= 0; i < domain_hash.records; ++i) { e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 4f7bd49120a..cc7795b4b0d 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -7,6 +7,7 @@ struct rpl_parallel; struct rpl_parallel_entry; struct rpl_parallel_thread_pool; +extern struct rpl_parallel_thread_pool pool_bkp_for_pfs; class Relay_log_info; struct inuse_relaylog; @@ -257,6 +258,38 @@ struct rpl_parallel_thread { }; +struct pool_bkp_for_pfs{ + uint32 count; + bool inited; + struct rpl_parallel_thread **rpl_thread_arr; + void init(uint32 thd_count) + { + DBUG_ASSERT(thd_count); + rpl_thread_arr= (rpl_parallel_thread **) + my_malloc(PSI_INSTRUMENT_ME, + thd_count * sizeof(rpl_parallel_thread*), + MYF(MY_WME | MY_ZEROFILL)); + for (uint i=0; i<thd_count; i++) + rpl_thread_arr[i]= (rpl_parallel_thread *) + my_malloc(PSI_INSTRUMENT_ME, sizeof(rpl_parallel_thread), + MYF(MY_WME | MY_ZEROFILL)); + count= thd_count; + inited= true; + } + + void destroy() + { + if (inited) + { + for (uint i=0; i<count; i++) + my_free(rpl_thread_arr[i]); + + my_free(rpl_thread_arr); + rpl_thread_arr= NULL; + } + } +}; + struct rpl_parallel_thread_pool { struct rpl_parallel_thread **threads; struct rpl_parallel_thread *free_list; @@ -270,8 +303,10 @@ struct rpl_parallel_thread_pool { is in progress. */ bool busy; + struct pool_bkp_for_pfs pfs_bkp; rpl_parallel_thread_pool(); + void copy_pool_for_pfs(Relay_log_info *rli); int init(uint32 size); void destroy(); void deactivate(); diff --git a/sql/rpl_reporting.cc b/sql/rpl_reporting.cc index 6ca09b6869e..d04f18c9c44 100644 --- a/sql/rpl_reporting.cc +++ b/sql/rpl_reporting.cc @@ -22,7 +22,7 @@ #include "sql_class.h" Slave_reporting_capability::Slave_reporting_capability(char const *thread_name) - : m_thread_name(thread_name) + : err_thread_id(0), m_thread_name(thread_name) { mysql_mutex_init(key_mutex_slave_reporting_capability_err_lock, &err_lock, MY_MUTEX_INIT_FAST); diff --git a/storage/perfschema/table_replication_applier_status_by_worker.cc b/storage/perfschema/table_replication_applier_status_by_worker.cc index 3b6ac4031cd..6d7866bc318 100644 --- a/storage/perfschema/table_replication_applier_status_by_worker.cc +++ b/storage/perfschema/table_replication_applier_status_by_worker.cc @@ -109,6 +109,22 @@ int table_replication_applier_status_by_worker::rnd_next(void) } mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); } + else + { + struct pool_bkp_for_pfs *bkp_pool= &global_rpl_thread_pool.pfs_bkp; + if (bkp_pool->inited && bkp_pool->count) + { + for (m_pos.set_at(&m_next_pos); + m_pos.has_more_workers(bkp_pool->count); + m_pos.next_worker()) + { + rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index]; + make_row(rpt); + m_next_pos.set_after(&m_pos); + return 0; + } + } + } return HA_ERR_END_OF_FILE; } @@ -130,6 +146,16 @@ int table_replication_applier_status_by_worker::rnd_pos(const void *pos) res= 0; } } + else + { + struct pool_bkp_for_pfs *bkp_pool= &global_rpl_thread_pool.pfs_bkp; + if (bkp_pool->inited && bkp_pool->count && m_pos.m_index < bkp_pool->count) + { + rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index]; + make_row(rpt); + res= 0; + } + } return res; } |