summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2021-07-28 18:09:21 -0700
committerdormando <dormando@rydia.net>2021-08-09 17:09:08 -0700
commit331dca5d644edefd99893c44827bdf2ca72f85be (patch)
treef3c51c0e55ef1b55435df715bf47a0899fd5fbd5
parente52734366c94475db33d3239f48b4542ec2d9c2f (diff)
downloadmemcached-331dca5d644edefd99893c44827bdf2ca72f85be.tar.gz
thread: per-worker-thread connection event queues
help scalability a bit by having a per-worker-thread freelist and queue for connection event items (new conns, etc). Also removes a hand-rolled linked list and uses cache.c for freelist handling to cull some redundancy.
-rw-r--r--memcached.h2
-rw-r--r--thread.c117
2 files changed, 39 insertions, 80 deletions
diff --git a/memcached.h b/memcached.h
index 739c939..4c91560 100644
--- a/memcached.h
+++ b/memcached.h
@@ -629,7 +629,7 @@ typedef struct {
int notify_receive_fd; /* receiving end of notify pipe */
int notify_send_fd; /* sending end of notify pipe */
struct thread_stats stats; /* Stats generated by this thread */
- struct conn_queue *new_conn_queue; /* queue of new connections to handle */
+ struct conn_queue *ev_queue; /* Worker/conn event queue */
cache_t *rbuf_cache; /* static-sized read buffers */
mc_resp_bundle *open_bundle;
cache_t *io_cache; /* IO objects */
diff --git a/thread.c b/thread.c
index 874cc9c..48c116a 100644
--- a/thread.c
+++ b/thread.c
@@ -13,6 +13,8 @@
#include <string.h>
#include <pthread.h>
+#include "queue.h"
+
#ifdef __sun
#include <atomic.h>
#endif
@@ -37,15 +39,15 @@ struct conn_queue_item {
enum conn_queue_item_modes mode;
conn *c;
void *ssl;
- CQ_ITEM *next;
+ STAILQ_ENTRY(conn_queue_item) i_next;
};
/* A connection queue. */
typedef struct conn_queue CQ;
struct conn_queue {
- CQ_ITEM *head;
- CQ_ITEM *tail;
+ STAILQ_HEAD(conn_ev_head, conn_queue_item) head;
pthread_mutex_t lock;
+ cache_t *cache; /* freelisted objects */
};
/* Locks for cache LRU operations */
@@ -64,10 +66,6 @@ static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
/* Lock to cause worker threads to hang up after being woken */
static pthread_mutex_t worker_hang_lock;
-/* Free list of CQ_ITEM structs */
-static CQ_ITEM *cqi_freelist;
-static pthread_mutex_t cqi_freelist_lock;
-
static pthread_mutex_t *item_locks;
/* size of the item lock hash table */
static uint32_t item_lock_count;
@@ -265,8 +263,12 @@ void stop_threads(void) {
*/
static void cq_init(CQ *cq) {
pthread_mutex_init(&cq->lock, NULL);
- cq->head = NULL;
- cq->tail = NULL;
+ STAILQ_INIT(&cq->head);
+ cq->cache = cache_create("cq", sizeof(CQ_ITEM), sizeof(char *));
+ if (cq->cache == NULL) {
+ fprintf(stderr, "Failed to create connection queue cache\n");
+ exit(EXIT_FAILURE);
+ }
}
/*
@@ -278,11 +280,9 @@ static CQ_ITEM *cq_pop(CQ *cq) {
CQ_ITEM *item;
pthread_mutex_lock(&cq->lock);
- item = cq->head;
- if (NULL != item) {
- cq->head = item->next;
- if (NULL == cq->head)
- cq->tail = NULL;
+ item = STAILQ_FIRST(&cq->head);
+ if (item != NULL) {
+ STAILQ_REMOVE_HEAD(&cq->head, i_next);
}
pthread_mutex_unlock(&cq->lock);
@@ -293,70 +293,31 @@ static CQ_ITEM *cq_pop(CQ *cq) {
* Adds an item to a connection queue.
*/
static void cq_push(CQ *cq, CQ_ITEM *item) {
- item->next = NULL;
-
pthread_mutex_lock(&cq->lock);
- if (NULL == cq->tail)
- cq->head = item;
- else
- cq->tail->next = item;
- cq->tail = item;
+ STAILQ_INSERT_TAIL(&cq->head, item, i_next);
pthread_mutex_unlock(&cq->lock);
}
/*
* Returns a fresh connection queue item.
*/
-static CQ_ITEM *cqi_new(void) {
- CQ_ITEM *item = NULL;
- pthread_mutex_lock(&cqi_freelist_lock);
- if (cqi_freelist) {
- item = cqi_freelist;
- cqi_freelist = item->next;
- }
- pthread_mutex_unlock(&cqi_freelist_lock);
-
- if (NULL == item) {
- int i;
-
- /* Allocate a bunch of items at once to reduce fragmentation */
- item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
- if (NULL == item) {
- STATS_LOCK();
- stats.malloc_fails++;
- STATS_UNLOCK();
- return NULL;
- }
-
- /*
- * Link together all the new items except the first one
- * (which we'll return to the caller) for placement on
- * the freelist.
- */
- for (i = 2; i < ITEMS_PER_ALLOC; i++)
- item[i - 1].next = &item[i];
-
- pthread_mutex_lock(&cqi_freelist_lock);
- item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
- cqi_freelist = &item[1];
- pthread_mutex_unlock(&cqi_freelist_lock);
+static CQ_ITEM *cqi_new(CQ *cq) {
+ CQ_ITEM *item = cache_alloc(cq->cache);
+ if (item == NULL) {
+ STATS_LOCK();
+ stats.malloc_fails++;
+ STATS_UNLOCK();
}
-
return item;
}
-
/*
* Frees a connection queue item (adds it to the freelist.)
*/
-static void cqi_free(CQ_ITEM *item) {
- pthread_mutex_lock(&cqi_freelist_lock);
- item->next = cqi_freelist;
- cqi_freelist = item;
- pthread_mutex_unlock(&cqi_freelist_lock);
+static void cqi_free(CQ *cq, CQ_ITEM *item) {
+ cache_free(cq->cache, item);
}
-
/*
* Creates a worker thread.
*/
@@ -412,12 +373,12 @@ static void setup_thread(LIBEVENT_THREAD *me) {
exit(1);
}
- me->new_conn_queue = malloc(sizeof(struct conn_queue));
- if (me->new_conn_queue == NULL) {
+ me->ev_queue = malloc(sizeof(struct conn_queue));
+ if (me->ev_queue == NULL) {
perror("Failed to allocate memory for connection queue");
exit(EXIT_FAILURE);
}
- cq_init(me->new_conn_queue);
+ cq_init(me->ev_queue);
if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
perror("Failed to initialize mutex");
@@ -507,7 +468,7 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
switch (buf[0]) {
case 'c':
- item = cq_pop(me->new_conn_queue);
+ item = cq_pop(me->ev_queue);
if (NULL == item) {
break;
@@ -553,7 +514,7 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
}
break;
}
- cqi_free(item);
+ cqi_free(me->ev_queue, item);
break;
/* we were told to pause and report in */
case 'p':
@@ -665,15 +626,8 @@ select:
*/
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport, void *ssl) {
- CQ_ITEM *item = cqi_new();
+ CQ_ITEM *item = NULL;
char buf[1];
- if (item == NULL) {
- close(sfd);
- /* given that malloc failed this may also fail, but let's try */
- fprintf(stderr, "Failed to allocate memory for connection object\n");
- return;
- }
-
LIBEVENT_THREAD *thread;
if (!settings.num_napi_ids)
@@ -681,6 +635,14 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
else
thread = select_thread_by_napi_id(sfd);
+ item = cqi_new(thread->ev_queue);
+ if (item == NULL) {
+ close(sfd);
+ /* given that malloc failed this may also fail, but let's try */
+ fprintf(stderr, "Failed to allocate memory for connection object\n");
+ return;
+ }
+
item->sfd = sfd;
item->init_state = init_state;
item->event_flags = event_flags;
@@ -689,7 +651,7 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
item->mode = queue_new_conn;
item->ssl = ssl;
- cq_push(thread->new_conn_queue, item);
+ cq_push(thread->ev_queue, item);
MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id);
buf[0] = 'c';
@@ -946,9 +908,6 @@ void memcached_thread_init(int nthreads, void *arg) {
pthread_mutex_init(&init_lock, NULL);
pthread_cond_init(&init_cond, NULL);
- pthread_mutex_init(&cqi_freelist_lock, NULL);
- cqi_freelist = NULL;
-
/* Want a wide lock table, but don't waste memory */
if (nthreads < 3) {
power = 10;