diff options
-rw-r--r-- | sql/rpl_parallel.cc | 4 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 3 | ||||
-rw-r--r-- | storage/perfschema/table_replication_applier_status_by_worker.cc | 49 |
3 files changed, 27 insertions, 29 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index a96f63039e2..05c6773c470 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -1781,6 +1781,7 @@ rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool) } else bkp->init(pool->count); + bkp->is_valid= false; // Mark backup as stale during pool init } } @@ -2050,7 +2051,7 @@ rpl_parallel_thread::rpl_parallel_thread() rpl_parallel_thread_pool::rpl_parallel_thread_pool() : threads(0), free_list(0), count(0), inited(false), busy(false), - pfs_bkp{0, false, NULL} + pfs_bkp{0, false, false, NULL} { } @@ -2179,6 +2180,7 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli) pfs_rpt->worker_idle_time= rpt->get_worker_idle_time(); pfs_rpt->last_trans_retry_count= rpt->last_trans_retry_count; } + pfs_bkp.is_valid= true; } } diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index d3c46301ff8..85aa8ca33c8 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -260,7 +260,7 @@ struct rpl_parallel_thread { struct pool_bkp_for_pfs{ uint32 count; - bool inited; + bool inited, is_valid; struct rpl_parallel_thread **rpl_thread_arr; void init(uint32 thd_count) { @@ -287,6 +287,7 @@ struct pool_bkp_for_pfs{ my_free(rpl_thread_arr); rpl_thread_arr= NULL; } + inited= false; } }; diff --git a/storage/perfschema/table_replication_applier_status_by_worker.cc b/storage/perfschema/table_replication_applier_status_by_worker.cc index 1ccf75e5d25..e982b5203ec 100644 --- a/storage/perfschema/table_replication_applier_status_by_worker.cc +++ b/storage/perfschema/table_replication_applier_status_by_worker.cc @@ -100,72 +100,67 @@ ha_rows table_replication_applier_status_by_worker::get_row_count() int table_replication_applier_status_by_worker::rnd_next(void) { rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; - if (pool->inited && pool->count) + struct pool_bkp_for_pfs *bkp_pool= &pool->pfs_bkp; + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + if (bkp_pool->inited && bkp_pool->count && bkp_pool->is_valid) { - mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); - uint worker_count= pool->count; for (m_pos.set_at(&m_next_pos); - m_pos.has_more_workers(worker_count); + m_pos.has_more_workers(bkp_pool->count); m_pos.next_worker()) { - rpl_parallel_thread *rpt= pool->threads[m_pos.m_index]; + rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index]; make_row(rpt); m_next_pos.set_after(&m_pos); mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); return 0; } - mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); } else { - mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); - struct pool_bkp_for_pfs *bkp_pool= &pool->pfs_bkp; - if (bkp_pool->inited && bkp_pool->count) + if (pool->inited && pool->count) { + uint worker_count= pool->count; for (m_pos.set_at(&m_next_pos); - m_pos.has_more_workers(bkp_pool->count); - m_pos.next_worker()) + m_pos.has_more_workers(worker_count); + m_pos.next_worker()) { - rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index]; + rpl_parallel_thread *rpt= pool->threads[m_pos.m_index]; make_row(rpt); m_next_pos.set_after(&m_pos); mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); return 0; } } - mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); } + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); return HA_ERR_END_OF_FILE; } int table_replication_applier_status_by_worker::rnd_pos(const void *pos) { int res= HA_ERR_RECORD_DELETED; + rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; + struct pool_bkp_for_pfs *bkp_pool= &pool->pfs_bkp; set_position(pos); - - if (global_rpl_thread_pool.inited && global_rpl_thread_pool.count) + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + if (bkp_pool->inited && bkp_pool->count && bkp_pool->is_valid + && m_pos.m_index < bkp_pool->count) { - rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; - mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); - if(m_pos.m_index < pool->count) - { - rpl_parallel_thread *rpt= pool->threads[m_pos.m_index]; - make_row(rpt); - mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); - res= 0; - } + rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index]; + make_row(rpt); + res= 0; } else { - struct pool_bkp_for_pfs *bkp_pool= &global_rpl_thread_pool.pfs_bkp; - if (bkp_pool->inited && bkp_pool->count && m_pos.m_index < bkp_pool->count) + if (pool->inited && pool->count && m_pos.m_index < pool->count) { - rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index]; + rpl_parallel_thread *rpt= pool->threads[m_pos.m_index]; make_row(rpt); res= 0; } } + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); return res; } |