diff options
Diffstat (limited to 'thread.c')
-rw-r--r-- | thread.c | 59 |
1 files changed, 43 insertions, 16 deletions
@@ -46,6 +46,10 @@ static pthread_mutex_t stats_lock; static CQ_ITEM *cqi_freelist; static pthread_mutex_t cqi_freelist_lock; +static pthread_mutex_t *item_locks; +/* TODO: Make this a function of the # of threads */ +#define ITEM_LOCKS 4000 + static LIBEVENT_DISPATCHER_THREAD dispatcher_thread; /* @@ -64,6 +68,14 @@ static pthread_cond_t init_cond; static void thread_libevent_process(int fd, short which, void *arg); +void item_lock(uint32_t hv) { + mutex_lock(&item_locks[hv % ITEM_LOCKS]); +} + +void item_unlock(uint32_t hv) { + pthread_mutex_unlock(&item_locks[hv % ITEM_LOCKS]); +} + /* * Initializes a connection queue. */ @@ -342,9 +354,9 @@ item *item_get(const char *key, const size_t nkey) { item *it; uint32_t hv; hv = hash(key, nkey, 0); - mutex_lock(&cache_lock); + item_lock(hv); it = do_item_get(key, nkey, hv); - pthread_mutex_unlock(&cache_lock); + item_unlock(hv); return it; } @@ -352,9 +364,9 @@ item *item_touch(const char *key, size_t nkey, uint32_t exptime) { item *it; uint32_t hv; hv = hash(key, nkey, 0); - mutex_lock(&cache_lock); + item_lock(hv); it = do_item_touch(key, nkey, exptime, hv); - pthread_mutex_unlock(&cache_lock); + item_unlock(hv); return it; } @@ -366,9 +378,9 @@ int item_link(item *item) { uint32_t hv; hv = hash(ITEM_key(item), item->nkey, 0); - mutex_lock(&cache_lock); + item_lock(hv); ret = do_item_link(item, hv); - pthread_mutex_unlock(&cache_lock); + item_unlock(hv); return ret; } @@ -377,9 +389,12 @@ int item_link(item *item) { * needed. */ void item_remove(item *item) { - mutex_lock(&cache_lock); + uint32_t hv; + hv = hash(ITEM_key(item), item->nkey, 0); + + item_lock(hv); do_item_remove(item); - pthread_mutex_unlock(&cache_lock); + item_unlock(hv); } /* @@ -397,18 +412,21 @@ int item_replace(item *old_it, item *new_it, const uint32_t hv) { void item_unlink(item *item) { uint32_t hv; hv = hash(ITEM_key(item), item->nkey, 0); - mutex_lock(&cache_lock); + item_lock(hv); do_item_unlink(item, hv); - pthread_mutex_unlock(&cache_lock); + item_unlock(hv); } /* * Moves an item to the back of the LRU queue. */ void item_update(item *item) { - mutex_lock(&cache_lock); + uint32_t hv; + hv = hash(ITEM_key(item), item->nkey, 0); + + item_lock(hv); do_item_update(item); - pthread_mutex_unlock(&cache_lock); + item_unlock(hv); } /* @@ -422,9 +440,9 @@ enum delta_result_type add_delta(conn *c, const char *key, uint32_t hv; hv = hash(key, nkey, 0); - mutex_lock(&cache_lock); + item_lock(hv); ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv); - pthread_mutex_unlock(&cache_lock); + item_unlock(hv); return ret; } @@ -436,9 +454,9 @@ enum store_item_type store_item(item *item, int comm, conn* c) { uint32_t hv; hv = hash(ITEM_key(item), item->nkey, 0); - mutex_lock(&cache_lock); + item_lock(hv); ret = do_store_item(item, comm, c, hv); - pthread_mutex_unlock(&cache_lock); + item_unlock(hv); return ret; } @@ -616,6 +634,15 @@ void thread_init(int nthreads, struct event_base *main_base) { pthread_mutex_init(&cqi_freelist_lock, NULL); cqi_freelist = NULL; + item_locks = calloc(ITEM_LOCKS, sizeof(pthread_mutex_t)); + if (! item_locks) { + perror("Can't allocate item locks"); + exit(1); + } + for (i = 0; i < ITEM_LOCKS; i++) { + pthread_mutex_init(&item_locks[i], NULL); + } + threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); if (! threads) { perror("Can't allocate thread descriptors"); |