summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--assoc.c35
-rw-r--r--items.c9
-rw-r--r--items.h2
-rw-r--r--memcached.h13
-rw-r--r--thread.c85
5 files changed, 73 insertions, 71 deletions
diff --git a/assoc.c b/assoc.c
index c35497f..066e00c 100644
--- a/assoc.c
+++ b/assoc.c
@@ -26,7 +26,7 @@
#include <pthread.h>
static pthread_cond_t maintenance_cond = PTHREAD_COND_INITIALIZER;
-
+static pthread_mutex_t maintenance_lock = PTHREAD_MUTEX_INITIALIZER;
typedef unsigned long int ub4; /* unsigned 4-byte quantities */
typedef unsigned char ub1; /* unsigned 1-byte quantities */
@@ -147,11 +147,6 @@ static void assoc_start_expand(void) {
if (started_expanding)
return;
- /*With this condition, we can expanding holding only one item lock,
- * and it should always be false*/
- if (item_lock_hashpower >= hashpower)
- return;
-
started_expanding = true;
pthread_cond_signal(&maintenance_cond);
}
@@ -209,11 +204,11 @@ int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
static void *assoc_maintenance_thread(void *arg) {
+ mutex_lock(&maintenance_lock);
while (do_run_maintenance_thread) {
int ii = 0;
- /* As there is only one thread process expanding, and we hold the item
- * lock, it seems not necessary to hold the cache_lock . */
+ /* There is only one expansion thread, so no need to global lock. */
for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
item *it, *next;
int bucket;
@@ -223,7 +218,6 @@ static void *assoc_maintenance_thread(void *arg) {
* is the lowest N bits of the hv, and the bucket of item_locks is
* also the lowest M bits of hv, and N is greater than M.
* So we can process expanding with only one item_lock. cool! */
- /*Get item lock for the slot in old hashtable*/
if ((item_lock = item_trylock(expand_bucket))) {
for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
next = it->h_next;
@@ -247,9 +241,7 @@ static void *assoc_maintenance_thread(void *arg) {
}
} else {
- /*wait for 100ms. since only one expanding thread, it's not
- * necessary to sleep a random value*/
- usleep(100*1000);
+ usleep(10*1000);
}
if (item_lock) {
@@ -260,11 +252,18 @@ static void *assoc_maintenance_thread(void *arg) {
if (!expanding) {
/* We are done expanding.. just wait for next invocation */
- mutex_lock(&cache_lock);
started_expanding = false;
- pthread_cond_wait(&maintenance_cond, &cache_lock);
+ pthread_cond_wait(&maintenance_cond, &maintenance_lock);
+ /* assoc_expand() swaps out the hash table entirely, so we need
+ * all threads to not hold any references related to the hash
+ * table while this happens.
+ * This is instead of a more complex, possibly slower algorithm to
+ * allow dynamic hash table expansion without causing significant
+ * wait times.
+ */
+ pause_threads(PAUSE_ALL_THREADS);
assoc_expand();
- mutex_unlock(&cache_lock);
+ pause_threads(RESUME_ALL_THREADS);
}
}
return NULL;
@@ -281,6 +280,7 @@ int start_assoc_maintenance_thread() {
hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
}
}
+ pthread_mutex_init(&maintenance_lock, NULL);
if ((ret = pthread_create(&maintenance_tid, NULL,
assoc_maintenance_thread, NULL)) != 0) {
fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
@@ -290,13 +290,12 @@ int start_assoc_maintenance_thread() {
}
void stop_assoc_maintenance_thread() {
- mutex_lock(&cache_lock);
+ mutex_lock(&maintenance_lock);
do_run_maintenance_thread = 0;
pthread_cond_signal(&maintenance_cond);
- mutex_unlock(&cache_lock);
+ mutex_unlock(&maintenance_lock);
/* Wait for the maintenance thread to stop */
pthread_join(maintenance_tid, NULL);
}
-
diff --git a/items.c b/items.c
index 8315d78..08347c1 100644
--- a/items.c
+++ b/items.c
@@ -958,6 +958,15 @@ enum crawler_result_type lru_crawler_crawl(char *slabs) {
return CRAWLER_OK;
}
+/* If we hold this lock, crawler can't wake up or move */
+void lru_crawler_pause(void) {
+ pthread_mutex_lock(&lru_crawler_lock);
+}
+
+void lru_crawler_resume(void) {
+ pthread_mutex_unlock(&lru_crawler_lock);
+}
+
int init_lru_crawler(void) {
if (lru_crawler_initialized == 0) {
if (pthread_cond_init(&lru_crawler_cond, NULL) != 0) {
diff --git a/items.h b/items.h
index d86c776..51ae39e 100644
--- a/items.h
+++ b/items.h
@@ -36,3 +36,5 @@ int start_item_crawler_thread(void);
int stop_item_crawler_thread(void);
int init_lru_crawler(void);
enum crawler_result_type lru_crawler_crawl(char *slabs);
+void lru_crawler_pause(void);
+void lru_crawler_resume(void);
diff --git a/memcached.h b/memcached.h
index 15595f8..36b3e65 100644
--- a/memcached.h
+++ b/memcached.h
@@ -188,9 +188,11 @@ enum network_transport {
udp_transport
};
-enum item_lock_types {
- ITEM_LOCK_GRANULAR = 0,
- ITEM_LOCK_GLOBAL
+enum pause_thread_types {
+ PAUSE_WORKER_THREADS = 0,
+ PAUSE_ALL_THREADS,
+ RESUME_ALL_THREADS,
+ RESUME_WORKER_THREADS
};
#define IS_UDP(x) (x == udp_transport)
@@ -389,7 +391,6 @@ typedef struct {
struct thread_stats stats; /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache; /* suffix cache */
- uint8_t item_lock_type; /* use fine-grained or global item lock */
} LIBEVENT_THREAD;
typedef struct {
@@ -574,13 +575,11 @@ void item_stats_sizes(ADD_STAT add_stats, void *c);
void item_unlink(item *it);
void item_update(item *it);
-void item_lock_global(void);
-void item_unlock_global(void);
void item_lock(uint32_t hv);
void *item_trylock(uint32_t hv);
void item_trylock_unlock(void *arg);
void item_unlock(uint32_t hv);
-void switch_item_lock_type(enum item_lock_types type);
+void pause_threads(enum pause_thread_types type);
unsigned short refcount_incr(unsigned short *refcount);
unsigned short refcount_decr(unsigned short *refcount);
void STATS_LOCK(void);
diff --git a/thread.c b/thread.c
index 3cc6490..2d12b7c 100644
--- a/thread.c
+++ b/thread.c
@@ -49,6 +49,9 @@ pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
/* Lock for global stats */
static pthread_mutex_t stats_lock;
+/* Lock to cause worker threads to hang up after being woken */
+static pthread_mutex_t worker_hang_lock;
+
/* Free list of CQ_ITEM structs */
static CQ_ITEM *cqi_freelist;
static pthread_mutex_t cqi_freelist_lock;
@@ -59,10 +62,6 @@ static uint32_t item_lock_count;
unsigned int item_lock_hashpower;
#define hashsize(n) ((unsigned long int)1<<(n))
#define hashmask(n) (hashsize(n)-1)
-/* this lock is temporarily engaged during a hash table expansion */
-static pthread_mutex_t item_global_lock;
-/* thread-specific variable for deeply finding the item lock type */
-static pthread_key_t item_lock_type_key;
static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
@@ -112,22 +111,8 @@ unsigned short refcount_decr(unsigned short *refcount) {
#endif
}
-/* Convenience functions for calling *only* when in ITEM_LOCK_GLOBAL mode */
-void item_lock_global(void) {
- mutex_lock(&item_global_lock);
-}
-
-void item_unlock_global(void) {
- mutex_unlock(&item_global_lock);
-}
-
void item_lock(uint32_t hv) {
- uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
- if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {
- mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
- } else {
- mutex_lock(&item_global_lock);
- }
+ mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
}
/* Special case. When ITEM_LOCK_GLOBAL mode is enabled, this should become a
@@ -150,12 +135,7 @@ void item_trylock_unlock(void *lock) {
}
void item_unlock(uint32_t hv) {
- uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
- if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {
- mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
- } else {
- mutex_unlock(&item_global_lock);
- }
+ mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
}
static void wait_for_thread_registration(int nthreads) {
@@ -169,18 +149,32 @@ static void register_thread_initialized(void) {
init_count++;
pthread_cond_signal(&init_cond);
pthread_mutex_unlock(&init_lock);
+ /* Force worker threads to pile up if someone wants us to */
+ pthread_mutex_lock(&worker_hang_lock);
+ pthread_mutex_unlock(&worker_hang_lock);
}
-void switch_item_lock_type(enum item_lock_types type) {
+/* Must not be called with any deeper locks held:
+ * item locks, cache_lock, stats_lock, etc
+ */
+void pause_threads(enum pause_thread_types type) {
char buf[1];
int i;
+ buf[0] = 0;
switch (type) {
- case ITEM_LOCK_GRANULAR:
- buf[0] = 'l';
+ case PAUSE_ALL_THREADS:
+ slabs_rebalancer_pause();
+ lru_crawler_pause();
+ case PAUSE_WORKER_THREADS:
+ buf[0] = 'p';
+ pthread_mutex_lock(&worker_hang_lock);
break;
- case ITEM_LOCK_GLOBAL:
- buf[0] = 'g';
+ case RESUME_ALL_THREADS:
+ slabs_rebalancer_resume();
+ lru_crawler_resume();
+ case RESUME_WORKER_THREADS:
+ pthread_mutex_unlock(&worker_hang_lock);
break;
default:
fprintf(stderr, "Unknown lock type: %d\n", type);
@@ -188,6 +182,11 @@ void switch_item_lock_type(enum item_lock_types type) {
break;
}
+ /* Only send a message if we have one. */
+ if (buf[0] == 0) {
+ return;
+ }
+
pthread_mutex_lock(&init_lock);
init_count = 0;
for (i = 0; i < settings.num_threads; i++) {
@@ -374,13 +373,6 @@ static void *worker_libevent(void *arg) {
* all threads have finished initializing.
*/
- /* set an indexable thread-specific memory item for the lock type.
- * this could be unnecessary if we pass the conn *c struct through
- * all item_lock calls...
- */
- me->item_lock_type = ITEM_LOCK_GRANULAR;
- pthread_setspecific(item_lock_type_key, &me->item_lock_type);
-
register_thread_initialized();
event_base_loop(me->base, 0);
@@ -425,13 +417,8 @@ static void thread_libevent_process(int fd, short which, void *arg) {
cqi_free(item);
}
break;
- /* we were told to flip the lock type and report in */
- case 'l':
- me->item_lock_type = ITEM_LOCK_GRANULAR;
- register_thread_initialized();
- break;
- case 'g':
- me->item_lock_type = ITEM_LOCK_GLOBAL;
+ /* we were told to pause and report in */
+ case 'p':
register_thread_initialized();
break;
}
@@ -784,6 +771,7 @@ void memcached_thread_init(int nthreads, struct event_base *main_base) {
pthread_mutex_init(&cache_lock, NULL);
pthread_mutex_init(&stats_lock, NULL);
+ pthread_mutex_init(&worker_hang_lock, NULL);
pthread_mutex_init(&init_lock, NULL);
pthread_cond_init(&init_cond, NULL);
@@ -803,6 +791,13 @@ void memcached_thread_init(int nthreads, struct event_base *main_base) {
power = 13;
}
+ if (power >= hashpower) {
+ fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
+ fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
+ fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
+ exit(1);
+ }
+
item_lock_count = hashsize(power);
item_lock_hashpower = power;
@@ -814,8 +809,6 @@ void memcached_thread_init(int nthreads, struct event_base *main_base) {
for (i = 0; i < item_lock_count; i++) {
pthread_mutex_init(&item_locks[i], NULL);
}
- pthread_key_create(&item_lock_type_key, NULL);
- pthread_mutex_init(&item_global_lock, NULL);
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
if (! threads) {