summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c59
1 files changed, 43 insertions, 16 deletions
diff --git a/thread.c b/thread.c
index efcc739..c201c5d 100644
--- a/thread.c
+++ b/thread.c
@@ -46,6 +46,10 @@ static pthread_mutex_t stats_lock;
static CQ_ITEM *cqi_freelist;
static pthread_mutex_t cqi_freelist_lock;
+static pthread_mutex_t *item_locks;
+/* TODO: Make this a function of the # of threads */
+#define ITEM_LOCKS 4000
+
static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
/*
@@ -64,6 +68,14 @@ static pthread_cond_t init_cond;
static void thread_libevent_process(int fd, short which, void *arg);
+void item_lock(uint32_t hv) {
+ mutex_lock(&item_locks[hv % ITEM_LOCKS]);
+}
+
+void item_unlock(uint32_t hv) {
+ pthread_mutex_unlock(&item_locks[hv % ITEM_LOCKS]);
+}
+
/*
* Initializes a connection queue.
*/
@@ -342,9 +354,9 @@ item *item_get(const char *key, const size_t nkey) {
item *it;
uint32_t hv;
hv = hash(key, nkey, 0);
- mutex_lock(&cache_lock);
+ item_lock(hv);
it = do_item_get(key, nkey, hv);
- pthread_mutex_unlock(&cache_lock);
+ item_unlock(hv);
return it;
}
@@ -352,9 +364,9 @@ item *item_touch(const char *key, size_t nkey, uint32_t exptime) {
item *it;
uint32_t hv;
hv = hash(key, nkey, 0);
- mutex_lock(&cache_lock);
+ item_lock(hv);
it = do_item_touch(key, nkey, exptime, hv);
- pthread_mutex_unlock(&cache_lock);
+ item_unlock(hv);
return it;
}
@@ -366,9 +378,9 @@ int item_link(item *item) {
uint32_t hv;
hv = hash(ITEM_key(item), item->nkey, 0);
- mutex_lock(&cache_lock);
+ item_lock(hv);
ret = do_item_link(item, hv);
- pthread_mutex_unlock(&cache_lock);
+ item_unlock(hv);
return ret;
}
@@ -377,9 +389,12 @@ int item_link(item *item) {
* needed.
*/
void item_remove(item *item) {
- mutex_lock(&cache_lock);
+ uint32_t hv;
+ hv = hash(ITEM_key(item), item->nkey, 0);
+
+ item_lock(hv);
do_item_remove(item);
- pthread_mutex_unlock(&cache_lock);
+ item_unlock(hv);
}
/*
@@ -397,18 +412,21 @@ int item_replace(item *old_it, item *new_it, const uint32_t hv) {
void item_unlink(item *item) {
uint32_t hv;
hv = hash(ITEM_key(item), item->nkey, 0);
- mutex_lock(&cache_lock);
+ item_lock(hv);
do_item_unlink(item, hv);
- pthread_mutex_unlock(&cache_lock);
+ item_unlock(hv);
}
/*
* Moves an item to the back of the LRU queue.
*/
void item_update(item *item) {
- mutex_lock(&cache_lock);
+ uint32_t hv;
+ hv = hash(ITEM_key(item), item->nkey, 0);
+
+ item_lock(hv);
do_item_update(item);
- pthread_mutex_unlock(&cache_lock);
+ item_unlock(hv);
}
/*
@@ -422,9 +440,9 @@ enum delta_result_type add_delta(conn *c, const char *key,
uint32_t hv;
hv = hash(key, nkey, 0);
- mutex_lock(&cache_lock);
+ item_lock(hv);
ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv);
- pthread_mutex_unlock(&cache_lock);
+ item_unlock(hv);
return ret;
}
@@ -436,9 +454,9 @@ enum store_item_type store_item(item *item, int comm, conn* c) {
uint32_t hv;
hv = hash(ITEM_key(item), item->nkey, 0);
- mutex_lock(&cache_lock);
+ item_lock(hv);
ret = do_store_item(item, comm, c, hv);
- pthread_mutex_unlock(&cache_lock);
+ item_unlock(hv);
return ret;
}
@@ -616,6 +634,15 @@ void thread_init(int nthreads, struct event_base *main_base) {
pthread_mutex_init(&cqi_freelist_lock, NULL);
cqi_freelist = NULL;
+ item_locks = calloc(ITEM_LOCKS, sizeof(pthread_mutex_t));
+ if (! item_locks) {
+ perror("Can't allocate item locks");
+ exit(1);
+ }
+ for (i = 0; i < ITEM_LOCKS; i++) {
+ pthread_mutex_init(&item_locks[i], NULL);
+ }
+
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
if (! threads) {
perror("Can't allocate thread descriptors");