summaryrefslogtreecommitdiff
path: root/sql/threadpool_generic.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/threadpool_generic.cc')
-rw-r--r--sql/threadpool_generic.cc31
1 files changed, 18 insertions, 13 deletions
diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc
index 86b039d5319..7aebd217d68 100644
--- a/sql/threadpool_generic.cc
+++ b/sql/threadpool_generic.cc
@@ -92,7 +92,7 @@ static PSI_thread_info thread_list[] =
thread_group_t *all_groups;
static uint group_count;
-static int32 shutdown_group_count;
+static Atomic_counter<uint32_t> shutdown_group_count;
/**
Used for printing "pool blocked" message, see
@@ -106,7 +106,7 @@ struct pool_timer_t
mysql_mutex_t mutex;
mysql_cond_t cond;
volatile uint64 current_microtime;
- volatile uint64 next_timeout_check;
+ std::atomic<uint64_t> next_timeout_check;
int tick_interval;
bool shutdown;
pthread_t timer_thread_id;
@@ -525,7 +525,8 @@ static void* timer_thread(void *param)
my_thread_init();
DBUG_ENTER("timer_thread");
- timer->next_timeout_check= ULONGLONG_MAX;
+ timer->next_timeout_check.store(std::numeric_limits<uint64_t>::max(),
+ std::memory_order_relaxed);
timer->current_microtime= microsecond_interval_timer();
for(;;)
@@ -553,11 +554,12 @@ static void* timer_thread(void *param)
}
/* Check if any client exceeded wait_timeout */
- if (timer->next_timeout_check <= timer->current_microtime)
+ if (timer->next_timeout_check.load(std::memory_order_relaxed) <=
+ timer->current_microtime)
{
/* Reset next timeout check, it will be recalculated below */
- my_atomic_fas64((volatile int64*) &timer->next_timeout_check,
- ULONGLONG_MAX);
+ timer->next_timeout_check.store(std::numeric_limits<uint64_t>::max(),
+ std::memory_order_relaxed);
server_threads.iterate(timeout_check, timer);
}
}
@@ -809,7 +811,7 @@ static void add_thread_count(thread_group_t *thread_group, int32 count)
thread_group->thread_count += count;
/* worker starts out and end in "active" state */
thread_group->active_thread_count += count;
- my_atomic_add32(&tp_stats.num_worker_threads, count);
+ tp_stats.num_worker_threads+= count;
}
@@ -829,7 +831,7 @@ static int create_worker(thread_group_t *thread_group, bool due_to_stall)
int err;
DBUG_ENTER("create_worker");
- if (tp_stats.num_worker_threads >= (int)threadpool_max_threads
+ if (tp_stats.num_worker_threads >= threadpool_max_threads
&& thread_group->thread_count >= 2)
{
err= 1;
@@ -978,7 +980,7 @@ void thread_group_destroy(thread_group_t *thread_group)
}
#endif
- if (my_atomic_add32(&shutdown_group_count, -1) == 1)
+ if (!--shutdown_group_count)
my_free(all_groups);
}
@@ -1331,12 +1333,15 @@ void TP_connection_generic::wait_end()
static void set_next_timeout_check(ulonglong abstime)
{
+ auto old= pool_timer.next_timeout_check.load(std::memory_order_relaxed);
DBUG_ENTER("set_next_timeout_check");
- while(abstime < pool_timer.next_timeout_check)
+ while (abstime < old)
{
- longlong old= (longlong)pool_timer.next_timeout_check;
- my_atomic_cas64((volatile int64*)&pool_timer.next_timeout_check,
- &old, abstime);
+ if (pool_timer.next_timeout_check.
+ compare_exchange_weak(old, abstime,
+ std::memory_order_relaxed,
+ std::memory_order_relaxed))
+ break;
}
DBUG_VOID_RETURN;
}