summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel_retry.result1
-rw-r--r--mysql-test/suite/rpl/r/rpl_perfschema_applier_status_by_worker.result42
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel_retry.test6
-rw-r--r--mysql-test/suite/rpl/t/rpl_perfschema_applier_status_by_worker.test96
-rw-r--r--sql/rpl_parallel.cc54
-rw-r--r--sql/rpl_parallel.h35
-rw-r--r--sql/rpl_reporting.cc2
-rw-r--r--storage/perfschema/table_replication_applier_status_by_worker.cc26
8 files changed, 260 insertions, 2 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel_retry.result b/mysql-test/suite/rpl/r/rpl_parallel_retry.result
index 29657a13eb7..2cc4044a2cd 100644
--- a/mysql-test/suite/rpl/r/rpl_parallel_retry.result
+++ b/mysql-test/suite/rpl/r/rpl_parallel_retry.result
@@ -127,6 +127,7 @@ include/wait_for_slave_sql_error.inc [errno=1213]
SET GLOBAL debug_dbug=@old_dbug;
retries
10
+include/assert.inc [Performance Schema retries should match with actual retries]
SELECT * FROM t1 ORDER BY a;
a b
1 3
diff --git a/mysql-test/suite/rpl/r/rpl_perfschema_applier_status_by_worker.result b/mysql-test/suite/rpl/r/rpl_perfschema_applier_status_by_worker.result
index 85f7d52dff7..3545dc606ac 100644
--- a/mysql-test/suite/rpl/r/rpl_perfschema_applier_status_by_worker.result
+++ b/mysql-test/suite/rpl/r/rpl_perfschema_applier_status_by_worker.result
@@ -38,6 +38,48 @@ include/wait_for_slave_to_stop.inc
RESET SLAVE ALL;
SET default_master_connection='';
CHANGE MASTER TO MASTER_USER='root', MASTER_HOST='127.0.0.1',MASTER_PORT=$MASTER_MYPORT;
+include/start_slave.inc
+
+# Introduce an error in the worker thread and check for the correctness
+# of error number, message and timestamp fields.
+
+connection master;
+use test;
+create table t(a int primary key);
+connection slave;
+drop table t;
+connection master;
+insert into t values(1);
+connection slave;
+include/wait_for_slave_sql_error.inc [errno=1146]
+
+# Extract the error related fields from SSS and PS table and compare
+# them for correctness.
+
+include/assert.inc [Value returned by SSS and PS table for Last_Error_Number should be same.]
+Last_Error_Message
+Error 'Table 'test.t' doesn't exist' on query. Default database: 'test'. Query: 'insert into t values(1)'
+
+# Verify that the error fields are preserved after STOP SLAVE.
+
+
+# 1. Verify that thread_id changes to NULL and service_state to "off" on
+# STOP SLAVE.
+
+include/assert.inc [After STOP SLAVE, thread_id should be NULL]
+include/assert.inc [So, Service_State after STOP SLAVE should be "OFF".]
+
+# 2. Extract the worker_id and the error related fields from SSS and PS
+# table and compare them. These fields should preserve their values.
+
+include/assert.inc [Value returned by SSS and PS table for Last_Error_Number should be same.]
+Last_Error_Message
+Error 'Table 'test.t' doesn't exist' on query. Default database: 'test'. Query: 'insert into t values(1)'
+include/stop_slave.inc
+RESET SLAVE;
+connection master;
+DROP TABLE t;
+RESET MASTER;
# Verify that number of rows in 'replication_applier_status_by_worker' table match with
# number of slave_parallel_workers.
diff --git a/mysql-test/suite/rpl/t/rpl_parallel_retry.test b/mysql-test/suite/rpl/t/rpl_parallel_retry.test
index 048c6e1df9b..8ea48a2f176 100644
--- a/mysql-test/suite/rpl/t/rpl_parallel_retry.test
+++ b/mysql-test/suite/rpl/t/rpl_parallel_retry.test
@@ -149,6 +149,12 @@ let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', V
--disable_query_log
eval SELECT $new_retry - $old_retry AS retries;
--enable_query_log
+let $ps_value= query_get_value(select last_trans_retry_count from
+ performance_schema.replication_applier_status_by_worker where
+ last_trans_retry_count > 0, last_trans_retry_count, 1);
+let $assert_text= Performance Schema retries should match with actual retries;
+let $assert_cond= "$ps_value" = $new_retry - $old_retry;
+source include/assert.inc;
SELECT * FROM t1 ORDER BY a;
STOP SLAVE IO_THREAD;
diff --git a/mysql-test/suite/rpl/t/rpl_perfschema_applier_status_by_worker.test b/mysql-test/suite/rpl/t/rpl_perfschema_applier_status_by_worker.test
index b59ab826bcc..6ded461db30 100644
--- a/mysql-test/suite/rpl/t/rpl_perfschema_applier_status_by_worker.test
+++ b/mysql-test/suite/rpl/t/rpl_perfschema_applier_status_by_worker.test
@@ -133,6 +133,102 @@ STOP SLAVE 'slave1';
RESET SLAVE ALL;
SET default_master_connection='';
evalp CHANGE MASTER TO MASTER_USER='root', MASTER_HOST='127.0.0.1',MASTER_PORT=$MASTER_MYPORT;
+--source include/start_slave.inc
+
+--echo
+--echo # Introduce an error in the worker thread and check for the correctness
+--echo # of error number, message and timestamp fields.
+--echo
+
+# Cause an error in Worker thread.
+# 1) Create a table 't' at master, replicate at slave.
+# 2) Drop table 't' at slave only.
+# 3) Insert a value in table 't' on master and replicate on slave.
+# Since slave doesnt have table 't' anymore, worker thread will report an error.
+
+--connection master
+use test;
+create table t(a int primary key);
+sync_slave_with_master;
+drop table t;
+--connection master
+insert into t values(1);
+--connection slave
+let $slave_sql_errno=1146;
+source include/wait_for_slave_sql_error.inc;
+
+--echo
+--echo # Extract the error related fields from SSS and PS table and compare
+--echo # them for correctness.
+--echo
+
+let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1);
+let $ps_value= query_get_value(select Last_Error_Number from performance_schema.replication_applier_status_by_worker, Last_Error_Number, 1);
+let $assert_text= Value returned by SSS and PS table for Last_Error_Number should be same.;
+let $assert_cond= "$sss_value" = "$ps_value";
+source include/assert.inc;
+
+--disable_query_log
+--replace_regex /master-bin.[0-9]+/FILENAME/ /end_log_pos [0-9]+/end_log_pos POSITION/
+select Last_Error_Message from performance_schema.replication_applier_status_by_worker;
+--enable_query_log
+
+--echo
+--echo # Verify that the error fields are preserved after STOP SLAVE.
+--echo
+
+--echo
+--echo # 1. Verify that thread_id changes to NULL and service_state to "off" on
+--echo # STOP SLAVE.
+--echo
+
+let $ps_value= query_get_value(select thread_id from performance_schema.replication_applier_status_by_worker, thread_id, 1);
+let $assert_text= After STOP SLAVE, thread_id should be NULL;
+let $assert_cond= "$ps_value" = "NULL";
+source include/assert.inc;
+
+let $ps_value= query_get_value(select service_state from performance_schema.replication_applier_status_by_worker, service_state, 1);
+let $assert_text= So, Service_State after STOP SLAVE should be "OFF".;
+let $assert_cond= "$ps_value"= "OFF";
+source include/assert.inc;
+
+--echo
+--echo # 2. Extract the worker_id and the error related fields from SSS and PS
+--echo # table and compare them. These fields should preserve their values.
+--echo
+
+let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1);
+let $ps_value= query_get_value(select Last_Error_Number from performance_schema.replication_applier_status_by_worker, Last_Error_Number, 1);
+let $assert_text= Value returned by SSS and PS table for Last_Error_Number should be same.;
+let $assert_cond= "$sss_value" = "$ps_value";
+source include/assert.inc;
+
+--disable_query_log
+--replace_regex /master-bin.[0-9]+/FILENAME/ /end_log_pos [0-9]+/end_log_pos POSITION/
+select Last_Error_Message from performance_schema.replication_applier_status_by_worker;
+--enable_query_log
+
+# The timestamp format is slightly different in SSS and PS.
+# SSS => YYMMDD HH:MM:SS
+# PS => YYYY-MM-DD HH:MM:SS
+# To match the two, we get rid of hyphons from PS output and first two digits
+# the year field so that it can be matched directly.
+
+#--- TODO: Can we include Last_SQL_Error_Timestamp as part of SSS
+
+#let $sss_value= query_get_value(SHOW SLAVE STATUS, Last_SQL_Error_Timestamp, 1);
+#let $ps_value= query_get_value(select Last_Error_Timestamp from performance_schema.replication_applier_status_by_worker, Last_Error_Timestamp, 1);
+#let $ps_value_without_hyphons= `SELECT REPLACE("$ps_value", '-', '')`;
+#let $ps_value_in_sss_format= `select substring("$ps_value_without_hyphons", 3)`;
+#let $assert_text= Value returned by SSS and PS table for Last_Error_Timestamp should be same.;
+#let $assert_cond= "$sss_value" = "$ps_value_in_sss_format";
+#source include/assert.inc;
+
+--source include/stop_slave.inc
+RESET SLAVE;
+--connection master
+DROP TABLE t;
+RESET MASTER;
--echo
--echo # Verify that number of rows in 'replication_applier_status_by_worker' table match with
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 44fba4e30e7..8be1964b762 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -1752,6 +1752,7 @@ int
rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
{
int rc= 0;
+ struct pool_bkp_for_pfs* bkp= &pool->pfs_bkp;
if ((rc= pool_mark_busy(pool, current_thd)))
return rc; // killed
@@ -1761,6 +1762,23 @@ rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
pool_mark_not_busy(pool);
rc= rpl_parallel_change_thread_count(pool, opt_slave_parallel_threads,
0);
+ if (!rc)
+ {
+ if (pool->count)
+ {
+ if (bkp->inited)
+ {
+ if (bkp->count != pool->count)
+ {
+ bkp->destroy();
+ bkp->init(pool->count);
+ }
+ }
+ else
+ bkp->init(pool->count);
+ }
+ }
+
}
else
{
@@ -2026,7 +2044,8 @@ rpl_parallel_thread::rpl_parallel_thread()
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
- : threads(0), free_list(0), count(0), inited(false), busy(false)
+ : threads(0), free_list(0), count(0), inited(false), busy(false),
+ pfs_bkp{0, false, NULL}
{
}
@@ -2057,6 +2076,7 @@ void
rpl_parallel_thread_pool::destroy()
{
deactivate();
+ pfs_bkp.destroy();
destroy_cond_mutex();
}
@@ -2125,6 +2145,37 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt)
mysql_mutex_unlock(&LOCK_rpl_thread_pool);
}
+void
+rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli)
+{
+ if (pfs_bkp.inited)
+ {
+ for(uint i=0; i<count;i++)
+ {
+ rpl_parallel_thread *rpt, *pfs_rpt;
+ rpt= threads[i];
+ pfs_rpt= pfs_bkp.rpl_thread_arr[i];
+ if (rpt->channel_name_length)
+ {
+ pfs_rpt->channel_name_length= rpt->channel_name_length;
+ strmake(pfs_rpt->channel_name, rpt->channel_name,
+ rpt->channel_name_length);
+ }
+ pfs_rpt->thd= rpt->thd;
+ pfs_rpt->last_seen_gtid= rpt->last_seen_gtid;
+ if (rli->err_thread_id && rpt->thd->thread_id == rli->err_thread_id)
+ {
+ pfs_rpt->last_error_number= rli->last_error().number;
+ strmake(pfs_rpt->last_error_message,
+ rli->last_error().message, sizeof(rli->last_error().message));
+ pfs_rpt->last_error_timestamp= rli->last_error().skr*1000000;
+ }
+ pfs_rpt->running= false;
+ pfs_rpt->worker_idle_time= rpt->get_worker_idle_time();
+ pfs_rpt->last_trans_retry_count= rpt->last_trans_retry_count;
+ }
+ }
+}
/*
Obtain a worker thread that we can queue an event to.
@@ -2393,6 +2444,7 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
STRING_WITH_LEN("now SIGNAL wait_for_done_waiting"));
};);
+ global_rpl_thread_pool.copy_pool_for_pfs(rli);
for (i= 0; i < domain_hash.records; ++i)
{
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 4f7bd49120a..cc7795b4b0d 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -7,6 +7,7 @@
struct rpl_parallel;
struct rpl_parallel_entry;
struct rpl_parallel_thread_pool;
+extern struct rpl_parallel_thread_pool pool_bkp_for_pfs;
class Relay_log_info;
struct inuse_relaylog;
@@ -257,6 +258,38 @@ struct rpl_parallel_thread {
};
+struct pool_bkp_for_pfs{
+ uint32 count;
+ bool inited;
+ struct rpl_parallel_thread **rpl_thread_arr;
+ void init(uint32 thd_count)
+ {
+ DBUG_ASSERT(thd_count);
+ rpl_thread_arr= (rpl_parallel_thread **)
+ my_malloc(PSI_INSTRUMENT_ME,
+ thd_count * sizeof(rpl_parallel_thread*),
+ MYF(MY_WME | MY_ZEROFILL));
+ for (uint i=0; i<thd_count; i++)
+ rpl_thread_arr[i]= (rpl_parallel_thread *)
+ my_malloc(PSI_INSTRUMENT_ME, sizeof(rpl_parallel_thread),
+ MYF(MY_WME | MY_ZEROFILL));
+ count= thd_count;
+ inited= true;
+ }
+
+ void destroy()
+ {
+ if (inited)
+ {
+ for (uint i=0; i<count; i++)
+ my_free(rpl_thread_arr[i]);
+
+ my_free(rpl_thread_arr);
+ rpl_thread_arr= NULL;
+ }
+ }
+};
+
struct rpl_parallel_thread_pool {
struct rpl_parallel_thread **threads;
struct rpl_parallel_thread *free_list;
@@ -270,8 +303,10 @@ struct rpl_parallel_thread_pool {
is in progress.
*/
bool busy;
+ struct pool_bkp_for_pfs pfs_bkp;
rpl_parallel_thread_pool();
+ void copy_pool_for_pfs(Relay_log_info *rli);
int init(uint32 size);
void destroy();
void deactivate();
diff --git a/sql/rpl_reporting.cc b/sql/rpl_reporting.cc
index 6ca09b6869e..d04f18c9c44 100644
--- a/sql/rpl_reporting.cc
+++ b/sql/rpl_reporting.cc
@@ -22,7 +22,7 @@
#include "sql_class.h"
Slave_reporting_capability::Slave_reporting_capability(char const *thread_name)
- : m_thread_name(thread_name)
+ : err_thread_id(0), m_thread_name(thread_name)
{
mysql_mutex_init(key_mutex_slave_reporting_capability_err_lock,
&err_lock, MY_MUTEX_INIT_FAST);
diff --git a/storage/perfschema/table_replication_applier_status_by_worker.cc b/storage/perfschema/table_replication_applier_status_by_worker.cc
index 3b6ac4031cd..6d7866bc318 100644
--- a/storage/perfschema/table_replication_applier_status_by_worker.cc
+++ b/storage/perfschema/table_replication_applier_status_by_worker.cc
@@ -109,6 +109,22 @@ int table_replication_applier_status_by_worker::rnd_next(void)
}
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
}
+ else
+ {
+ struct pool_bkp_for_pfs *bkp_pool= &global_rpl_thread_pool.pfs_bkp;
+ if (bkp_pool->inited && bkp_pool->count)
+ {
+ for (m_pos.set_at(&m_next_pos);
+ m_pos.has_more_workers(bkp_pool->count);
+ m_pos.next_worker())
+ {
+ rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index];
+ make_row(rpt);
+ m_next_pos.set_after(&m_pos);
+ return 0;
+ }
+ }
+ }
return HA_ERR_END_OF_FILE;
}
@@ -130,6 +146,16 @@ int table_replication_applier_status_by_worker::rnd_pos(const void *pos)
res= 0;
}
}
+ else
+ {
+ struct pool_bkp_for_pfs *bkp_pool= &global_rpl_thread_pool.pfs_bkp;
+ if (bkp_pool->inited && bkp_pool->count && m_pos.m_index < bkp_pool->count)
+ {
+ rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index];
+ make_row(rpt);
+ res= 0;
+ }
+ }
return res;
}