summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c131
1 files changed, 114 insertions, 17 deletions
diff --git a/thread.c b/thread.c
index 93867a5..086b783 100644
--- a/thread.c
+++ b/thread.c
@@ -57,8 +57,12 @@ static pthread_mutex_t cqi_freelist_lock;
static pthread_mutex_t *item_locks;
/* size of the item lock hash table */
static uint32_t item_lock_count;
-/* size - 1 for lookup masking */
-static uint32_t item_lock_mask;
+#define hashsize(n) ((unsigned long int)1<<(n))
+#define hashmask(n) (hashsize(n)-1)
+/* this lock is temporarily engaged during a hash table expansion */
+static pthread_mutex_t item_global_lock;
+/* thread-specific variable for deeply finding the item lock type */
+static pthread_key_t item_lock_type_key;
static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
@@ -108,16 +112,92 @@ unsigned short refcount_decr(unsigned short *refcount) {
#endif
}
+/* Convenience functions for calling *only* when in ITEM_LOCK_GLOBAL mode */
+void item_lock_global(void) {
+ mutex_lock(&item_global_lock);
+}
+
+void item_unlock_global(void) {
+ mutex_unlock(&item_global_lock);
+}
+
void item_lock(uint32_t hv) {
- mutex_lock(&item_locks[hv & item_lock_mask]);
+ uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
+ if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {
+ mutex_lock(&item_locks[(hv & hashmask(hashpower)) % item_lock_count]);
+ } else {
+ mutex_lock(&item_global_lock);
+ }
}
-int item_trylock(uint32_t hv) {
- return pthread_mutex_trylock(&item_locks[hv & item_lock_mask]);
+/* Special case. When ITEM_LOCK_GLOBAL mode is enabled, this should become a
+ * no-op, as it's only called from within the item lock if necessary.
+ * However, we can't mix a no-op and threads which are still synchronizing to
+ * GLOBAL. So instead we just always try to lock. When in GLOBAL mode this
+ * turns into an effective no-op. Threads re-synchronize after the power level
+ * switch so it should stay safe.
+ */
+void *item_trylock(uint32_t hv) {
+ pthread_mutex_t *lock = &item_locks[(hv & hashmask(hashpower)) % item_lock_count];
+ if (pthread_mutex_trylock(lock) == 0) {
+ return lock;
+ }
+ return NULL;
+}
+
+void item_trylock_unlock(void *lock) {
+ mutex_unlock((pthread_mutex_t *) lock);
}
void item_unlock(uint32_t hv) {
- mutex_unlock(&item_locks[hv & item_lock_mask]);
+ uint8_t *lock_type = pthread_getspecific(item_lock_type_key);
+ if (likely(*lock_type == ITEM_LOCK_GRANULAR)) {
+ mutex_unlock(&item_locks[(hv & hashmask(hashpower)) % item_lock_count]);
+ } else {
+ mutex_unlock(&item_global_lock);
+ }
+}
+
+static void wait_for_thread_registration(int nthreads) {
+ while (init_count < nthreads) {
+ pthread_cond_wait(&init_cond, &init_lock);
+ }
+}
+
+static void register_thread_initialized(void) {
+ pthread_mutex_lock(&init_lock);
+ init_count++;
+ pthread_cond_signal(&init_cond);
+ pthread_mutex_unlock(&init_lock);
+}
+
+void switch_item_lock_type(enum item_lock_types type) {
+ char buf[1];
+ int i;
+
+ switch (type) {
+ case ITEM_LOCK_GRANULAR:
+ buf[0] = 'l';
+ break;
+ case ITEM_LOCK_GLOBAL:
+ buf[0] = 'g';
+ break;
+ default:
+ fprintf(stderr, "Unknown lock type: %d\n", type);
+ assert(1 == 0);
+ break;
+ }
+
+ pthread_mutex_lock(&init_lock);
+ init_count = 0;
+ for (i = 0; i < settings.num_threads; i++) {
+ if (write(threads[i].notify_send_fd, buf, 1) != 1) {
+ perror("Failed writing to notify pipe");
+ /* TODO: This is a fatal problem. Can it ever happen temporarily? */
+ }
+ }
+ wait_for_thread_registration(settings.num_threads);
+ pthread_mutex_unlock(&init_lock);
}
/*
@@ -282,7 +362,6 @@ static void setup_thread(LIBEVENT_THREAD *me) {
}
}
-
/*
* Worker thread: main event loop
*/
@@ -293,10 +372,14 @@ static void *worker_libevent(void *arg) {
* all threads have finished initializing.
*/
- pthread_mutex_lock(&init_lock);
- init_count++;
- pthread_cond_signal(&init_cond);
- pthread_mutex_unlock(&init_lock);
+ /* set an indexable thread-specific memory item for the lock type.
+ * this could be unnecessary if we pass the conn *c struct through
+ * all item_lock calls...
+ */
+ me->item_lock_type = ITEM_LOCK_GRANULAR;
+ pthread_setspecific(item_lock_type_key, &me->item_lock_type);
+
+ register_thread_initialized();
event_base_loop(me->base, 0);
return NULL;
@@ -316,6 +399,8 @@ static void thread_libevent_process(int fd, short which, void *arg) {
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");
+ switch (buf[0]) {
+ case 'c':
item = cq_pop(me->new_conn_queue);
if (NULL != item) {
@@ -337,6 +422,17 @@ static void thread_libevent_process(int fd, short which, void *arg) {
}
cqi_free(item);
}
+ break;
+ /* we were told to flip the lock type and report in */
+ case 'l':
+ me->item_lock_type = ITEM_LOCK_GRANULAR;
+ register_thread_initialized();
+ break;
+ case 'g':
+ me->item_lock_type = ITEM_LOCK_GLOBAL;
+ register_thread_initialized();
+ break;
+ }
}
/* Which thread we assigned a connection to most recently. */
@@ -350,6 +446,7 @@ static int last_thread = -1;
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {
CQ_ITEM *item = cqi_new();
+ char buf[1];
int tid = (last_thread + 1) % settings.num_threads;
LIBEVENT_THREAD *thread = threads + tid;
@@ -365,7 +462,8 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
cq_push(thread->new_conn_queue, item);
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
- if (write(thread->notify_send_fd, "", 1) != 1) {
+ buf[0] = 'c';
+ if (write(thread->notify_send_fd, buf, 1) != 1) {
perror("Writing to thread notify pipe");
}
}
@@ -690,8 +788,7 @@ void thread_init(int nthreads, struct event_base *main_base) {
power = 13;
}
- item_lock_count = ((unsigned long int)1 << (power));
- item_lock_mask = item_lock_count - 1;
+ item_lock_count = hashsize(power);
item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
if (! item_locks) {
@@ -701,6 +798,8 @@ void thread_init(int nthreads, struct event_base *main_base) {
for (i = 0; i < item_lock_count; i++) {
pthread_mutex_init(&item_locks[i], NULL);
}
+ pthread_key_create(&item_lock_type_key, NULL);
+ pthread_mutex_init(&item_global_lock, NULL);
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
if (! threads) {
@@ -733,9 +832,7 @@ void thread_init(int nthreads, struct event_base *main_base) {
/* Wait for all the threads to set themselves up before returning. */
pthread_mutex_lock(&init_lock);
- while (init_count < nthreads) {
- pthread_cond_wait(&init_cond, &init_lock);
- }
+ wait_for_thread_registration(nthreads);
pthread_mutex_unlock(&init_lock);
}