diff options
author | dormando <dormando@rydia.net> | 2021-07-28 18:09:21 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2021-08-09 17:09:08 -0700 |
commit | 331dca5d644edefd99893c44827bdf2ca72f85be (patch) | |
tree | f3c51c0e55ef1b55435df715bf47a0899fd5fbd5 | |
parent | e52734366c94475db33d3239f48b4542ec2d9c2f (diff) | |
download | memcached-331dca5d644edefd99893c44827bdf2ca72f85be.tar.gz |
thread: per-worker-thread connection event queues
help scalability a bit by having a per-worker-thread freelist and queue
for connection event items (new conns, etc). Also removes a hand-rolled
linked list and uses cache.c for freelist handling to cull some
redundancy.
-rw-r--r-- | memcached.h | 2 | ||||
-rw-r--r-- | thread.c | 117 |
2 files changed, 39 insertions, 80 deletions
diff --git a/memcached.h b/memcached.h index 739c939..4c91560 100644 --- a/memcached.h +++ b/memcached.h @@ -629,7 +629,7 @@ typedef struct { int notify_receive_fd; /* receiving end of notify pipe */ int notify_send_fd; /* sending end of notify pipe */ struct thread_stats stats; /* Stats generated by this thread */ - struct conn_queue *new_conn_queue; /* queue of new connections to handle */ + struct conn_queue *ev_queue; /* Worker/conn event queue */ cache_t *rbuf_cache; /* static-sized read buffers */ mc_resp_bundle *open_bundle; cache_t *io_cache; /* IO objects */ @@ -13,6 +13,8 @@ #include <string.h> #include <pthread.h> +#include "queue.h" + #ifdef __sun #include <atomic.h> #endif @@ -37,15 +39,15 @@ struct conn_queue_item { enum conn_queue_item_modes mode; conn *c; void *ssl; - CQ_ITEM *next; + STAILQ_ENTRY(conn_queue_item) i_next; }; /* A connection queue. */ typedef struct conn_queue CQ; struct conn_queue { - CQ_ITEM *head; - CQ_ITEM *tail; + STAILQ_HEAD(conn_ev_head, conn_queue_item) head; pthread_mutex_t lock; + cache_t *cache; /* freelisted objects */ }; /* Locks for cache LRU operations */ @@ -64,10 +66,6 @@ static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; /* Lock to cause worker threads to hang up after being woken */ static pthread_mutex_t worker_hang_lock; -/* Free list of CQ_ITEM structs */ -static CQ_ITEM *cqi_freelist; -static pthread_mutex_t cqi_freelist_lock; - static pthread_mutex_t *item_locks; /* size of the item lock hash table */ static uint32_t item_lock_count; @@ -265,8 +263,12 @@ void stop_threads(void) { */ static void cq_init(CQ *cq) { pthread_mutex_init(&cq->lock, NULL); - cq->head = NULL; - cq->tail = NULL; + STAILQ_INIT(&cq->head); + cq->cache = cache_create("cq", sizeof(CQ_ITEM), sizeof(char *)); + if (cq->cache == NULL) { + fprintf(stderr, "Failed to create connection queue cache\n"); + exit(EXIT_FAILURE); + } } /* @@ -278,11 +280,9 @@ static CQ_ITEM *cq_pop(CQ *cq) { CQ_ITEM *item; pthread_mutex_lock(&cq->lock); - item = cq->head; - if (NULL != item) { - cq->head = item->next; - if (NULL == cq->head) - cq->tail = NULL; + item = STAILQ_FIRST(&cq->head); + if (item != NULL) { + STAILQ_REMOVE_HEAD(&cq->head, i_next); } pthread_mutex_unlock(&cq->lock); @@ -293,70 +293,31 @@ static CQ_ITEM *cq_pop(CQ *cq) { * Adds an item to a connection queue. */ static void cq_push(CQ *cq, CQ_ITEM *item) { - item->next = NULL; - pthread_mutex_lock(&cq->lock); - if (NULL == cq->tail) - cq->head = item; - else - cq->tail->next = item; - cq->tail = item; + STAILQ_INSERT_TAIL(&cq->head, item, i_next); pthread_mutex_unlock(&cq->lock); } /* * Returns a fresh connection queue item. */ -static CQ_ITEM *cqi_new(void) { - CQ_ITEM *item = NULL; - pthread_mutex_lock(&cqi_freelist_lock); - if (cqi_freelist) { - item = cqi_freelist; - cqi_freelist = item->next; - } - pthread_mutex_unlock(&cqi_freelist_lock); - - if (NULL == item) { - int i; - - /* Allocate a bunch of items at once to reduce fragmentation */ - item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC); - if (NULL == item) { - STATS_LOCK(); - stats.malloc_fails++; - STATS_UNLOCK(); - return NULL; - } - - /* - * Link together all the new items except the first one - * (which we'll return to the caller) for placement on - * the freelist. - */ - for (i = 2; i < ITEMS_PER_ALLOC; i++) - item[i - 1].next = &item[i]; - - pthread_mutex_lock(&cqi_freelist_lock); - item[ITEMS_PER_ALLOC - 1].next = cqi_freelist; - cqi_freelist = &item[1]; - pthread_mutex_unlock(&cqi_freelist_lock); +static CQ_ITEM *cqi_new(CQ *cq) { + CQ_ITEM *item = cache_alloc(cq->cache); + if (item == NULL) { + STATS_LOCK(); + stats.malloc_fails++; + STATS_UNLOCK(); } - return item; } - /* * Frees a connection queue item (adds it to the freelist.) */ -static void cqi_free(CQ_ITEM *item) { - pthread_mutex_lock(&cqi_freelist_lock); - item->next = cqi_freelist; - cqi_freelist = item; - pthread_mutex_unlock(&cqi_freelist_lock); +static void cqi_free(CQ *cq, CQ_ITEM *item) { + cache_free(cq->cache, item); } - /* * Creates a worker thread. */ @@ -412,12 +373,12 @@ static void setup_thread(LIBEVENT_THREAD *me) { exit(1); } - me->new_conn_queue = malloc(sizeof(struct conn_queue)); - if (me->new_conn_queue == NULL) { + me->ev_queue = malloc(sizeof(struct conn_queue)); + if (me->ev_queue == NULL) { perror("Failed to allocate memory for connection queue"); exit(EXIT_FAILURE); } - cq_init(me->new_conn_queue); + cq_init(me->ev_queue); if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) { perror("Failed to initialize mutex"); @@ -507,7 +468,7 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) switch (buf[0]) { case 'c': - item = cq_pop(me->new_conn_queue); + item = cq_pop(me->ev_queue); if (NULL == item) { break; @@ -553,7 +514,7 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) } break; } - cqi_free(item); + cqi_free(me->ev_queue, item); break; /* we were told to pause and report in */ case 'p': @@ -665,15 +626,8 @@ select: */ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport, void *ssl) { - CQ_ITEM *item = cqi_new(); + CQ_ITEM *item = NULL; char buf[1]; - if (item == NULL) { - close(sfd); - /* given that malloc failed this may also fail, but let's try */ - fprintf(stderr, "Failed to allocate memory for connection object\n"); - return; - } - LIBEVENT_THREAD *thread; if (!settings.num_napi_ids) @@ -681,6 +635,14 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, else thread = select_thread_by_napi_id(sfd); + item = cqi_new(thread->ev_queue); + if (item == NULL) { + close(sfd); + /* given that malloc failed this may also fail, but let's try */ + fprintf(stderr, "Failed to allocate memory for connection object\n"); + return; + } + item->sfd = sfd; item->init_state = init_state; item->event_flags = event_flags; @@ -689,7 +651,7 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, item->mode = queue_new_conn; item->ssl = ssl; - cq_push(thread->new_conn_queue, item); + cq_push(thread->ev_queue, item); MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id); buf[0] = 'c'; @@ -946,9 +908,6 @@ void memcached_thread_init(int nthreads, void *arg) { pthread_mutex_init(&init_lock, NULL); pthread_cond_init(&init_cond, NULL); - pthread_mutex_init(&cqi_freelist_lock, NULL); - cqi_freelist = NULL; - /* Want a wide lock table, but don't waste memory */ if (nthreads < 3) { power = 10; |