summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/rpl_parallel.cc4
-rw-r--r--sql/rpl_parallel.h3
-rw-r--r--storage/perfschema/table_replication_applier_status_by_worker.cc49
3 files changed, 27 insertions, 29 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index a96f63039e2..05c6773c470 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -1781,6 +1781,7 @@ rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
}
else
bkp->init(pool->count);
+ bkp->is_valid= false; // Mark backup as stale during pool init
}
}
@@ -2050,7 +2051,7 @@ rpl_parallel_thread::rpl_parallel_thread()
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
: threads(0), free_list(0), count(0), inited(false), busy(false),
- pfs_bkp{0, false, NULL}
+ pfs_bkp{0, false, false, NULL}
{
}
@@ -2179,6 +2180,7 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli)
pfs_rpt->worker_idle_time= rpt->get_worker_idle_time();
pfs_rpt->last_trans_retry_count= rpt->last_trans_retry_count;
}
+ pfs_bkp.is_valid= true;
}
}
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index d3c46301ff8..85aa8ca33c8 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -260,7 +260,7 @@ struct rpl_parallel_thread {
struct pool_bkp_for_pfs{
uint32 count;
- bool inited;
+ bool inited, is_valid;
struct rpl_parallel_thread **rpl_thread_arr;
void init(uint32 thd_count)
{
@@ -287,6 +287,7 @@ struct pool_bkp_for_pfs{
my_free(rpl_thread_arr);
rpl_thread_arr= NULL;
}
+ inited= false;
}
};
diff --git a/storage/perfschema/table_replication_applier_status_by_worker.cc b/storage/perfschema/table_replication_applier_status_by_worker.cc
index 1ccf75e5d25..e982b5203ec 100644
--- a/storage/perfschema/table_replication_applier_status_by_worker.cc
+++ b/storage/perfschema/table_replication_applier_status_by_worker.cc
@@ -100,72 +100,67 @@ ha_rows table_replication_applier_status_by_worker::get_row_count()
int table_replication_applier_status_by_worker::rnd_next(void)
{
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
- if (pool->inited && pool->count)
+ struct pool_bkp_for_pfs *bkp_pool= &pool->pfs_bkp;
+ mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
+ if (bkp_pool->inited && bkp_pool->count && bkp_pool->is_valid)
{
- mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
- uint worker_count= pool->count;
for (m_pos.set_at(&m_next_pos);
- m_pos.has_more_workers(worker_count);
+ m_pos.has_more_workers(bkp_pool->count);
m_pos.next_worker())
{
- rpl_parallel_thread *rpt= pool->threads[m_pos.m_index];
+ rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index];
make_row(rpt);
m_next_pos.set_after(&m_pos);
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
return 0;
}
- mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
}
else
{
- mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
- struct pool_bkp_for_pfs *bkp_pool= &pool->pfs_bkp;
- if (bkp_pool->inited && bkp_pool->count)
+ if (pool->inited && pool->count)
{
+ uint worker_count= pool->count;
for (m_pos.set_at(&m_next_pos);
- m_pos.has_more_workers(bkp_pool->count);
- m_pos.next_worker())
+ m_pos.has_more_workers(worker_count);
+ m_pos.next_worker())
{
- rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index];
+ rpl_parallel_thread *rpt= pool->threads[m_pos.m_index];
make_row(rpt);
m_next_pos.set_after(&m_pos);
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
return 0;
}
}
- mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
}
+ mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
return HA_ERR_END_OF_FILE;
}
int table_replication_applier_status_by_worker::rnd_pos(const void *pos)
{
int res= HA_ERR_RECORD_DELETED;
+ rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
+ struct pool_bkp_for_pfs *bkp_pool= &pool->pfs_bkp;
set_position(pos);
-
- if (global_rpl_thread_pool.inited && global_rpl_thread_pool.count)
+ mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
+ if (bkp_pool->inited && bkp_pool->count && bkp_pool->is_valid
+ && m_pos.m_index < bkp_pool->count)
{
- rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
- mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
- if(m_pos.m_index < pool->count)
- {
- rpl_parallel_thread *rpt= pool->threads[m_pos.m_index];
- make_row(rpt);
- mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
- res= 0;
- }
+ rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index];
+ make_row(rpt);
+ res= 0;
}
else
{
- struct pool_bkp_for_pfs *bkp_pool= &global_rpl_thread_pool.pfs_bkp;
- if (bkp_pool->inited && bkp_pool->count && m_pos.m_index < bkp_pool->count)
+ if (pool->inited && pool->count && m_pos.m_index < pool->count)
{
- rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index];
+ rpl_parallel_thread *rpt= pool->threads[m_pos.m_index];
make_row(rpt);
res= 0;
}
}
+ mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
return res;
}