summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteven Grimm <sgrimm@facebook.com>2007-04-16 15:34:03 +0000
committerSteven Grimm <sgrimm@facebook.com>2007-04-16 15:34:03 +0000
commit56b8339e0606c1e59987c8d6368dfe727f3914b8 (patch)
tree6a3b5a81e147e26321f88ff679d345cb3cea025f
parent5b304997cc5e19cc9eb6856b70716574492c2989 (diff)
downloadmemcached-56b8339e0606c1e59987c8d6368dfe727f3914b8.tar.gz
Merge multithreaded into trunk, commit #2 (first commit only did the
new files, not the modified ones.) git-svn-id: http://code.sixapart.com/svn/memcached/trunk/server@509 b0b603af-a30f-0410-a34e-baf09ae79d0b
-rw-r--r--ChangeLog21
-rw-r--r--Makefile.am2
-rw-r--r--assoc.c43
-rw-r--r--assoc.h3
-rw-r--r--configure.ac10
-rw-r--r--doc/memcached.112
-rw-r--r--items.c119
-rw-r--r--items.h18
-rw-r--r--memcached.c497
-rw-r--r--memcached.h123
-rw-r--r--slabs.c25
-rw-r--r--slabs.h8
-rwxr-xr-xt/stats.t2
13 files changed, 622 insertions, 261 deletions
diff --git a/ChangeLog b/ChangeLog
index b4ed32b..31504d5 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -69,6 +69,17 @@
* Explicitly compare against NULL or zero in many places.
+2007-03-05
+ * Steven Grimm <sgrimm@facebook.com>: Per-object-type stats collection
+ support. Specify the object type delimiter with the -D command line
+ option. Turn stats gathering on and off with "stats detail on" and
+ "stats detail off". Dump the per-object-type details with
+ "stats detail dump".
+
+2007-03-01
+ * Steven Grimm <sgrimm@facebook.com>: Fix an off-by-one error in the
+ multithreaded version's message passing code.
+
2006-12-23
* fix expirations of items set with absolute expiration times in
the past, before the server's start time. bug was introduced in
@@ -97,10 +108,20 @@
* Steve Peters <steve@fisharerojo.org>: OpenBSD has a malloc.h,
but warns to use stdlib.h instead
+2006-11-22
+ * Steven Grimm <sgrimm@facebook.com>: Add support for multithreaded
+ execution. Run configure with "--enable-threads" to enable. See
+ doc/threads.txt for details.
+
2006-11-13
* Iain Wade <iwade@optusnet.com.au>: Fix for UDP responses on non-"get"
commands.
+2006-10-15
+ * Steven Grimm <sgrimm@facebook.com>: Dynamic sizing of hashtable to
+ reduce collisions on very large caches and conserve memory on
+ small caches.
+
2006-10-13
* Steven Grimm <sgrimm@facebook.com>: New faster hash function.
diff --git a/Makefile.am b/Makefile.am
index 98724eb..11c006c 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -1,6 +1,6 @@
bin_PROGRAMS = memcached memcached-debug
-memcached_SOURCES = memcached.c slabs.c slabs.h items.c items.h assoc.c assoc.h memcached.h
+memcached_SOURCES = memcached.c slabs.c slabs.h items.c items.h assoc.c assoc.h memcached.h thread.c stats.c stats.h
memcached_debug_SOURCES = $(memcached_SOURCES)
memcached_CPPFLAGS = -DNDEBUG
memcached_LDADD = @LIBOBJS@
diff --git a/assoc.c b/assoc.c
index 14f8f83..d6edc88 100644
--- a/assoc.c
+++ b/assoc.c
@@ -12,10 +12,9 @@
*
* $Id$
*/
-#include "config.h"
-#include <sys/types.h>
+
+#include "memcached.h"
#include <sys/stat.h>
-#include <sys/time.h>
#include <sys/socket.h>
#include <sys/signal.h>
#include <sys/resource.h>
@@ -24,13 +23,9 @@
#include <stdio.h>
#include <string.h>
#include <unistd.h>
-#include <netinet/in.h>
#include <errno.h>
-#include <event.h>
#include <assert.h>
-#include "memcached.h"
-
/*
* Since the hash function does bit manipulation, it needs to know
* whether it's big or little-endian. ENDIAN_LITTLE and ENDIAN_BIG
@@ -142,7 +137,7 @@ and these came close:
}
#if HASH_LITTLE_ENDIAN == 1
-static uint32_t hash(
+uint32_t hash(
const void *key, /* the key to hash */
size_t length, /* length of the key */
const uint32_t initval) /* initval */
@@ -323,7 +318,7 @@ static uint32_t hash(
* from hashlittle() on all machines. hashbig() takes advantage of
* big-endian byte ordering.
*/
-static uint32_t hash( const void *key, size_t length, const uint32_t initval)
+uint32_t hash( const void *key, size_t length, const uint32_t initval)
{
uint32_t a,b,c;
union { const void *ptr; size_t i; } u; /* to cast key to (size_t) happily */
@@ -541,39 +536,41 @@ static void assoc_expand(void) {
primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
if (primary_hashtable) {
- if (settings.verbose > 1)
- fprintf(stderr, "Hash table expansion starting\n");
+ if (settings.verbose > 1)
+ fprintf(stderr, "Hash table expansion starting\n");
hashpower++;
expanding = 1;
expand_bucket = 0;
- assoc_move_next_bucket();
+ do_assoc_move_next_bucket();
} else {
primary_hashtable = old_hashtable;
- /* Bad news, but we can keep running. */
+ /* Bad news, but we can keep running. */
}
}
/* migrates the next bucket to the primary hashtable if we're expanding. */
-void assoc_move_next_bucket(void) {
+void do_assoc_move_next_bucket(void) {
item *it, *next;
int bucket;
if (expanding) {
for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
- next = it->h_next;
+ next = it->h_next;
bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower);
it->h_next = primary_hashtable[bucket];
primary_hashtable[bucket] = it;
- }
+ }
- expand_bucket++;
- if (expand_bucket == hashsize(hashpower - 1)) {
- expanding = 0;
- free(old_hashtable);
- if (settings.verbose > 1)
- fprintf(stderr, "Hash table expansion done\n");
- }
+ old_hashtable[expand_bucket] = NULL;
+
+ expand_bucket++;
+ if (expand_bucket == hashsize(hashpower - 1)) {
+ expanding = 0;
+ free(old_hashtable);
+ if (settings.verbose > 1)
+ fprintf(stderr, "Hash table expansion done\n");
+ }
}
}
diff --git a/assoc.h b/assoc.h
index 398e015..3b32162 100644
--- a/assoc.h
+++ b/assoc.h
@@ -3,4 +3,5 @@ void assoc_init(void);
item *assoc_find(const char *key, const size_t nkey);
int assoc_insert(item *item);
void assoc_delete(const char *key, const size_t nkey);
-void assoc_move_next_bucket(void);
+void do_assoc_move_next_bucket(void);
+uint32_t hash( const void *key, size_t length, const uint32_t initval);
diff --git a/configure.ac b/configure.ac
index 19caccb..f45f7cf 100644
--- a/configure.ac
+++ b/configure.ac
@@ -94,6 +94,7 @@ dnl ----------------------------------------------------------------------------
AC_SEARCH_LIBS(socket, socket)
AC_SEARCH_LIBS(gethostbyname, nsl)
AC_SEARCH_LIBS(mallinfo, malloc)
+AC_SEARCH_LIBS(pthread_create, pthread)
AC_CHECK_FUNC(daemon,AC_DEFINE([HAVE_DAEMON],,[Define this if you have daemon()]),[AC_LIBOBJ(daemon)])
@@ -156,6 +157,15 @@ fi
AC_C_ENDIAN
+dnl Check whether the user wants threads or not
+AC_ARG_ENABLE(threads,
+ [AS_HELP_STRING([--enable-threads],[support multithreaded execution])],
+ [if test "$ac_cv_search_pthread_create" != "no"; then
+ AC_DEFINE([USE_THREADS],,[Define this if you want to use pthreads])
+ else
+ AC_MSG_ERROR([Can't enable threads without the POSIX thread library.])
+ fi])
+
AC_CHECK_FUNCS(mlockall)
AC_CONFIG_FILES(Makefile doc/Makefile)
diff --git a/doc/memcached.1 b/doc/memcached.1
index dbd1c8e..fee6975 100644
--- a/doc/memcached.1
+++ b/doc/memcached.1
@@ -84,6 +84,18 @@ Print memcached and libevent licenses.
.TP
.B \-P <filename>
Print pidfile to <filename>, only used under -d option.
+.TP
+.B \-t <threads>
+Number of threads to use to process incoming requests. This option is only
+meaningful if memcached was compiled with thread support enabled. It is
+typically not useful to set this higher than the number of CPU cores on the
+memcached server.
+.TP
+.B \-D <char>
+Use <char> as the delimiter between key prefixes and IDs. This is used for
+per-prefix stats reporting. The default is ":" (colon). If this option is
+specified, stats collection is turned on automatically; if not, then it may
+be turned on by sending the "stats detail on" command to the server.
.br
.SH LICENSE
The memcached daemon is copyright Danga Interactive and is distributed under
diff --git a/items.c b/items.c
index ee2b391..2a33d59 100644
--- a/items.c
+++ b/items.c
@@ -1,8 +1,7 @@
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/* $Id$ */
-#include <sys/types.h>
+#include "memcached.h"
#include <sys/stat.h>
-#include <sys/time.h>
#include <sys/socket.h>
#include <sys/signal.h>
#include <sys/resource.h>
@@ -11,14 +10,10 @@
#include <stdio.h>
#include <string.h>
#include <unistd.h>
-#include <netinet/in.h>
#include <errno.h>
#include <time.h>
-#include <event.h>
#include <assert.h>
-#include "memcached.h"
-
/* Forward Declarations */
static void item_link_q(item *it);
static void item_unlink_q(item *it);
@@ -44,6 +39,17 @@ void item_init(void) {
}
}
+/* Enable this for reference-count debugging. */
+#if 0
+# define DEBUG_REFCNT(it,op) \
+ fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
+ it, op, it->refcount, \
+ (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
+ (it->it_flags & ITEM_SLABBED) ? 'S' : ' ', \
+ (it->it_flags & ITEM_DELETED) ? 'D' : ' ')
+#else
+# define DEBUG_REFCNT(it,op) while(0)
+#endif
/*
* Generates the variable-sized part of the header for an object.
@@ -65,7 +71,7 @@ static size_t item_make_header(const uint8_t nkey, const int flags, const int nb
}
/*@null@*/
-item *item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes) {
+item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes) {
uint8_t nsuffix;
item *it;
char suffix[40];
@@ -98,9 +104,12 @@ item *item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t
for (search = tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
if (search->refcount == 0) {
- if (search->exptime > current_time)
+ if (search->exptime > current_time) {
+ STATS_LOCK();
stats.evictions++;
- item_unlink(search);
+ STATS_UNLOCK();
+ }
+ do_item_unlink(search);
break;
}
}
@@ -115,7 +124,8 @@ item *item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t
assert(it != heads[it->slabs_clsid]);
it->next = it->prev = it->h_next = 0;
- it->refcount = 0;
+ it->refcount = 1; /* the caller will have a reference */
+ DEBUG_REFCNT(it, '*');
it->it_flags = 0;
it->nkey = nkey;
it->nbytes = nbytes;
@@ -136,6 +146,7 @@ void item_free(item *it) {
/* so slab size changer can tell later if item is already free or not */
it->slabs_clsid = 0;
it->it_flags |= ITEM_SLABBED;
+ DEBUG_REFCNT(it, 'F');
slabs_free(it, ntotal);
}
@@ -192,57 +203,66 @@ static void item_unlink_q(item *it) {
return;
}
-int item_link(item *it) {
+int do_item_link(item *it) {
assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
assert(it->nbytes < 1048576);
it->it_flags |= ITEM_LINKED;
it->time = current_time;
assoc_insert(it);
+ STATS_LOCK();
stats.curr_bytes += ITEM_ntotal(it);
stats.curr_items += 1;
stats.total_items += 1;
+ STATS_UNLOCK();
item_link_q(it);
return 1;
}
-void item_unlink(item *it) {
+void do_item_unlink(item *it) {
if ((it->it_flags & ITEM_LINKED) != 0) {
it->it_flags &= ~ITEM_LINKED;
+ STATS_LOCK();
stats.curr_bytes -= ITEM_ntotal(it);
stats.curr_items -= 1;
+ STATS_UNLOCK();
assoc_delete(ITEM_key(it), it->nkey);
item_unlink_q(it);
+ if (it->refcount == 0) item_free(it);
}
- if (it->refcount == 0) item_free(it);
}
-void item_remove(item *it) {
+void do_item_remove(item *it) {
assert((it->it_flags & ITEM_SLABBED) == 0);
- if (it->refcount != 0) it->refcount--;
+ if (it->refcount != 0) {
+ it->refcount--;
+ DEBUG_REFCNT(it, '-');
+ }
assert((it->it_flags & ITEM_DELETED) == 0 || it->refcount != 0);
if (it->refcount == 0 && (it->it_flags & ITEM_LINKED) == 0) {
item_free(it);
}
}
-void item_update(item *it) {
+void do_item_update(item *it) {
if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
assert((it->it_flags & ITEM_SLABBED) == 0);
- item_unlink_q(it);
- it->time = current_time;
- item_link_q(it);
+ if (it->it_flags & ITEM_LINKED) {
+ item_unlink_q(it);
+ it->time = current_time;
+ item_link_q(it);
+ }
}
}
-int item_replace(item *it, item *new_it) {
+int do_item_replace(item *it, item *new_it) {
assert((it->it_flags & ITEM_SLABBED) == 0);
- item_unlink(it);
- return item_link(new_it);
+ do_item_unlink(it);
+ return do_item_link(new_it);
}
/*@null@*/
@@ -337,8 +357,59 @@ char* item_stats_sizes(int *bytes) {
return buf;
}
+/* returns true if a deleted item's delete-locked-time is over, and it
+ should be removed from the namespace */
+int item_delete_lock_over (item *it) {
+ assert(it->it_flags & ITEM_DELETED);
+ return (current_time >= it->exptime);
+}
+
+/* wrapper around assoc_find which does the lazy expiration/deletion logic */
+item *do_item_get_notedeleted(char *key, size_t nkey, int *delete_locked) {
+ item *it = assoc_find(key, nkey);
+ if (delete_locked) *delete_locked = 0;
+ if (it && (it->it_flags & ITEM_DELETED)) {
+ /* it's flagged as delete-locked. let's see if that condition
+ is past due, and the 5-second delete_timer just hasn't
+ gotten to it yet... */
+ if (! item_delete_lock_over(it)) {
+ if (delete_locked) *delete_locked = 1;
+ it = 0;
+ }
+ }
+ if (it && settings.oldest_live && settings.oldest_live <= current_time &&
+ it->time <= settings.oldest_live) {
+ do_item_unlink(it); // MTSAFE - cache_lock held
+ it = 0;
+ }
+ if (it && it->exptime && it->exptime <= current_time) {
+ do_item_unlink(it); // MTSAFE - cache_lock held
+ it = 0;
+ }
+
+ if (it) {
+ it->refcount++;
+ DEBUG_REFCNT(it, '+');
+ }
+ return it;
+}
+
+item *item_get(char *key, size_t nkey) {
+ return item_get_notedeleted(key, nkey, 0);
+}
+
+/* returns an item whether or not it's delete-locked or expired. */
+item *do_item_get_nocheck(char *key, size_t nkey) {
+ item *it = assoc_find(key, nkey);
+ if (it) {
+ it->refcount++;
+ DEBUG_REFCNT(it, '+');
+ }
+ return it;
+}
+
/* expires items that are more recent than the oldest_live setting. */
-void item_flush_expired(void) {
+void do_item_flush_expired(void) {
int i;
item *iter, *next;
if (settings.oldest_live == 0)
@@ -353,7 +424,7 @@ void item_flush_expired(void) {
if (iter->time >= settings.oldest_live) {
next = iter->next;
if ((iter->it_flags & ITEM_SLABBED) == 0) {
- item_unlink(iter);
+ do_item_unlink(iter);
}
} else {
/* We've hit the first old item. Continue to the next queue. */
diff --git a/items.h b/items.h
index 695804f..db9b7a2 100644
--- a/items.h
+++ b/items.h
@@ -1,15 +1,15 @@
/* See items.c */
void item_init(void);
/*@null@*/
-item *item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes);
+item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes);
void item_free(item *it);
bool item_size_ok(const size_t nkey, const int flags, const int nbytes);
-int item_link(item *it); /* may fail if transgresses limits */
-void item_unlink(item *it);
-void item_remove(item *it);
-void item_update(item *it); /* update LRU time to current and reposition */
-int item_replace(item *it, item *new_it);
+int do_item_link(item *it); /* may fail if transgresses limits */
+void do_item_unlink(item *it);
+void do_item_remove(item *it);
+void do_item_update(item *it); /* update LRU time to current and reposition */
+int do_item_replace(item *it, item *new_it);
/*@null@*/
char *item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes);
@@ -17,4 +17,8 @@ void item_stats(char *buffer, const int buflen);
/*@null@*/
char *item_stats_sizes(int *bytes);
-void item_flush_expired(void);
+void do_item_flush_expired(void);
+item *item_get(char *key, size_t nkey);
+
+item *do_item_get_notedeleted(char *key, size_t nkey, int *delete_locked);
+item *do_item_get_nocheck(char *key, size_t nkey);
diff --git a/memcached.c b/memcached.c
index 4ce15c0..0f303ce 100644
--- a/memcached.c
+++ b/memcached.c
@@ -15,10 +15,8 @@
*
* $Id$
*/
-#include "config.h"
-#include <sys/types.h>
+#include "memcached.h"
#include <sys/stat.h>
-#include <sys/time.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/signal.h>
@@ -41,12 +39,10 @@
#include <stdio.h>
#include <string.h>
#include <unistd.h>
-#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <errno.h>
#include <time.h>
-#include <event.h>
#include <assert.h>
#include <limits.h>
@@ -64,8 +60,6 @@
#endif
#endif
-#include "memcached.h"
-
/*
* forward declarations
*/
@@ -85,7 +79,6 @@ static void settings_init(void);
/* event handling, network IO */
static void event_handler(const int fd, const short which, void *arg);
-static conn *conn_new(const int sfd, const int init_state, const int event_flags, const int read_buffer_size, const bool is_udp);
static void conn_close(conn *c);
static void conn_init(void);
static void accept_new_conns(const bool do_accept);
@@ -115,6 +108,7 @@ static item **todelete = 0;
static int delcurr;
static int deltotal;
static conn *listen_conn;
+static struct event_base *main_base;
#define TRANSMIT_COMPLETE 0
#define TRANSMIT_INCOMPLETE 1
@@ -159,12 +153,16 @@ static void stats_init(void) {
like 'settings.oldest_live' which act as booleans as well as
values are now false in boolean context... */
stats.started = time(0) - 2;
+ stats_prefix_init();
}
static void stats_reset(void) {
+ STATS_LOCK();
stats.total_items = stats.total_conns = 0;
stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0;
stats.bytes_read = stats.bytes_written = 0;
+ stats_prefix_clear();
+ STATS_UNLOCK();
}
static void settings_init(void) {
@@ -180,6 +178,13 @@ static void settings_init(void) {
settings.managed = false;
settings.factor = 1.25;
settings.chunk_size = 48; /* space for a modest key and value */
+#ifdef USE_THREADS
+ settings.num_threads = 4;
+#else
+ settings.num_threads = 1;
+#endif
+ settings.prefix_delimiter = ':';
+ settings.detail_enabled = 0;
}
/* returns true if a deleted item's delete-locked-time is over, and it
@@ -189,37 +194,6 @@ static bool item_delete_lock_over (item *it) {
return (current_time >= it->exptime);
}
-/* wrapper around assoc_find which does the lazy expiration/deletion logic */
-static item *get_item_notedeleted(const char *key, const size_t nkey, int *delete_locked) {
- item *it = assoc_find(key, nkey);
-
- if (delete_locked) *delete_locked = 0;
-
- if (it != NULL && (it->it_flags & ITEM_DELETED)) {
- /* it's flagged as delete-locked. let's see if that condition
- is past due, and the 5-second delete_timer just hasn't
- gotten to it yet... */
- if (! item_delete_lock_over(it)) {
- if (delete_locked) *delete_locked = 1;
- it = 0;
- }
- }
- if (it != NULL && settings.oldest_live != 0 && settings.oldest_live <= current_time &&
- it->time <= settings.oldest_live) {
- item_unlink(it);
- it = 0;
- }
- if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
- item_unlink(it);
- it = 0;
- }
- return it;
-}
-
-static item *get_item(const char *key, const size_t nkey) {
- return get_item_notedeleted(key, nkey, 0);
-}
-
/*
* Adds a message header to a connection.
*
@@ -260,10 +234,16 @@ static int add_msghdr(conn *c)
return 0;
}
+
+/*
+ * Free list management for connections.
+ */
+
static conn **freeconns;
static int freetotal;
static int freecurr;
+
static void conn_init(void) {
freetotal = 200;
freecurr = 0;
@@ -273,15 +253,48 @@ static void conn_init(void) {
return;
}
-/*@null@*/
-static conn *conn_new(const int sfd, const int init_state, const int event_flags,
- const int read_buffer_size, const bool is_udp) {
+/*
+ * Returns a connection from the freelist, if any. Should call this using
+ * conn_from_freelist() for thread safety.
+ */
+conn *do_conn_from_freelist() {
conn *c;
- /* do we have a free conn structure from a previous close? */
if (freecurr > 0) {
c = freeconns[--freecurr];
- } else { /* allocate a new one */
+ } else {
+ c = NULL;
+ }
+
+ return c;
+}
+
+/*
+ * Adds a connection to the freelist. 0 = success. Should call this using
+ * conn_add_to_freelist() for thread safety.
+ */
+int do_conn_add_to_freelist(conn *c) {
+ if (freecurr < freetotal) {
+ freeconns[freecurr++] = c;
+ return 0;
+ } else {
+ /* try to enlarge free connections array */
+ conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2);
+ if (new_freeconns) {
+ freetotal *= 2;
+ freeconns = new_freeconns;
+ freeconns[freecurr++] = c;
+ return 0;
+ }
+ }
+ return 1;
+}
+
+conn *conn_new(const int sfd, const int init_state, const int event_flags,
+ const int read_buffer_size, const bool is_udp, struct event_base *base) {
+ conn *c = conn_from_freelist();
+
+ if (NULL == c) {
if (!(c = (conn *)malloc(sizeof(conn)))) {
perror("malloc()");
return NULL;
@@ -317,7 +330,9 @@ static conn *conn_new(const int sfd, const int init_state, const int event_flags
return NULL;
}
+ STATS_LOCK();
stats.conn_structs++;
+ STATS_UNLOCK();
}
if (settings.verbose > 1) {
@@ -350,26 +365,20 @@ static conn *conn_new(const int sfd, const int init_state, const int event_flags
c->gen = 0;
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
+ event_base_set(base, &c->event);
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1) {
- if (freecurr < freetotal) {
- freeconns[freecurr++] = c;
- } else {
- if (c->hdrbuf)
- free (c->hdrbuf);
- free (c->msglist);
- free (c->rbuf);
- free (c->wbuf);
- free (c->ilist);
- free (c->iov);
- free (c);
+ if (conn_add_to_freelist(c)) {
+ conn_free(c);
}
return NULL;
}
+ STATS_LOCK();
stats.curr_conns++;
stats.total_conns++;
+ STATS_UNLOCK();
return c;
}
@@ -378,7 +387,7 @@ static void conn_cleanup(conn *c) {
assert(c != NULL);
if (c->item) {
- item_free(c->item);
+ item_remove(c->item);
c->item = 0;
}
@@ -397,7 +406,7 @@ static void conn_cleanup(conn *c) {
/*
* Frees a connection.
*/
-static void conn_free(conn *c) {
+void conn_free(conn *c) {
if (c) {
if (c->hdrbuf)
free(c->hdrbuf);
@@ -429,24 +438,13 @@ static void conn_close(conn *c) {
conn_cleanup(c);
/* if the connection has big buffers, just free it */
- if (c->rsize > READ_BUFFER_HIGHWAT) {
+ if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
conn_free(c);
- } else if (freecurr < freetotal) {
- /* if we have enough space in the free connections array, put the structure there */
- freeconns[freecurr++] = c;
- } else {
- /* try to enlarge free connections array */
- conn **new_freeconns = realloc((void *)freeconns, sizeof(conn *) * freetotal * 2);
- if (new_freeconns) {
- freetotal *= 2;
- freeconns = new_freeconns;
- freeconns[freecurr++] = c;
- } else {
- conn_free(c);
- }
}
+ STATS_LOCK();
stats.curr_conns--;
+ STATS_UNLOCK();
return;
}
@@ -689,56 +687,63 @@ static void complete_nread(conn *c) {
item *it = c->item;
int comm = c->item_comm;
- item *old_it;
- int delete_locked = 0;
- char *key = ITEM_key(it);
+ STATS_LOCK();
stats.set_cmds++;
+ STATS_UNLOCK();
if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
out_string(c, "CLIENT_ERROR bad data chunk");
- goto err;
+ } else {
+ if (store_item(it, comm)) {
+ out_string(c, "STORED");
+ } else {
+ out_string(c, "NOT_STORED");
+ }
}
- old_it = get_item_notedeleted(key, it->nkey, &delete_locked);
-
- if (old_it != NULL && comm == NREAD_ADD) {
- item_update(old_it); /* touches item, promotes to head of LRU */
- out_string(c, "NOT_STORED");
- goto err;
- }
+ item_remove(c->item); /* release the c->item reference */
+ c->item = 0;
+}
- if (old_it == NULL && comm == NREAD_REPLACE) {
- out_string(c, "NOT_STORED");
- goto err;
- }
+/*
+ * Stores an item in the cache according to the semantics of one of the set
+ * commands. In threaded mode, this is protected by the cache lock.
+ *
+ * Returns true if the item was stored.
+ */
+int do_store_item(item *it, int comm) {
+ char *key = ITEM_key(it);
+ int delete_locked = 0;
+ item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
+ int stored = 0;
- if (delete_locked != 0) {
- if (comm == NREAD_REPLACE || comm == NREAD_ADD) {
- out_string(c, "NOT_STORED");
- goto err;
- }
+ if (old_it != NULL && comm == NREAD_ADD) {
+ /* add only adds a nonexistent item, but promote to head of LRU */
+ do_item_update(old_it);
+ } else if (!old_it && comm == NREAD_REPLACE) {
+ /* replace only replaces an existing value; don't store */
+ } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD)) {
+ /* replace and add can't override delete locks; don't store */
+ } else {
+ /* "set" commands can override the delete lock
+ window... in which case we have to find the old hidden item
+ that's in the namespace/LRU but wasn't returned by
+ item_get.... because we need to replace it */
+ if (delete_locked)
+ old_it = do_item_get_nocheck(key, it->nkey);
+
+ if (old_it != NULL)
+ do_item_replace(old_it, it);
+ else
+ do_item_link(it);
- /* but "set" commands can override the delete lock
- window... in which case we have to find the old hidden item
- that's in the namespace/LRU but wasn't returned by
- get_item.... because we need to replace it (below) */
- old_it = assoc_find(key, it->nkey);
+ stored = 1;
}
if (old_it)
- item_replace(old_it, it);
- else
- item_link(it);
-
- c->item = 0;
- out_string(c, "STORED");
- return;
-
-err:
- item_free(it);
- c->item = 0;
- return;
+ do_item_remove(old_it); /* release our reference */
+ return stored;
}
typedef struct token_s {
@@ -816,6 +821,36 @@ static size_t tokenize_command(char *command, token_t *tokens, const size_t max_
return ntokens;
}
+inline void process_stats_detail(conn *c, const char *command) {
+ assert(c != NULL);
+
+ if (strcmp(command, "on") == 0) {
+ settings.detail_enabled = 1;
+ out_string(c, "OK");
+ }
+ else if (strcmp(command, "off") == 0) {
+ settings.detail_enabled = 0;
+ out_string(c, "OK");
+ }
+ else if (strcmp(command, "dump") == 0) {
+ int len;
+ char *stats = stats_prefix_dump(&len);
+ if (NULL != stats) {
+ c->write_and_free = stats;
+ c->wcurr = stats;
+ c->wbytes = len;
+ conn_set_state(c, conn_write);
+ c->write_and_go = conn_read;
+ }
+ else {
+ out_string(c, "SERVER_ERROR");
+ }
+ }
+ else {
+ out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
+ }
+}
+
static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
rel_time_t now = current_time;
char *command;
@@ -838,6 +873,7 @@ static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
getrusage(RUSAGE_SELF, &usage);
+ STATS_LOCK();
pos += sprintf(pos, "STAT pid %u\r\n", pid);
pos += sprintf(pos, "STAT uptime %u\r\n", now);
pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
@@ -859,7 +895,9 @@ static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (unsigned long long)settings.maxbytes);
+ pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads);
pos += sprintf(pos, "END");
+ STATS_UNLOCK();
out_string(c, temp);
return;
}
@@ -989,6 +1027,14 @@ static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
return;
}
+ if (strcmp(subcommand, "detail") == 0) {
+ if (ntokens < 4)
+ process_stats_detail(c, ""); /* outputs the error message */
+ else
+ process_stats_detail(c, tokens[2].value);
+ return;
+ }
+
if (strcmp(subcommand, "sizes") == 0) {
int bytes = 0;
char *buf = item_stats_sizes(&bytes);
@@ -1042,8 +1088,13 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens)
return;
}
+ STATS_LOCK();
stats.get_cmds++;
- it = get_item(key, nkey);
+ STATS_UNLOCK();
+ it = item_get(key, nkey);
+ if (settings.detail_enabled) {
+ stats_prefix_record_get(key, NULL != it);
+ }
if (it) {
if (i >= c->isize) {
item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
@@ -1069,13 +1120,19 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens)
if (settings.verbose > 1)
fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
+ /* item_get() has incremented it->refcount for us */
+ STATS_LOCK();
stats.get_hits++;
- it->refcount++;
+ STATS_UNLOCK();
item_update(it);
*(c->ilist + i) = it;
i++;
- } else stats.get_misses++;
+ } else {
+ STATS_LOCK();
+ stats.get_misses++;
+ STATS_UNLOCK();
+ }
key_token++;
}
@@ -1135,6 +1192,10 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken
return;
}
+ if (settings.detail_enabled) {
+ stats_prefix_record_set(key);
+ }
+
if (settings.managed) {
int bucket = c->bucket;
if (bucket == -1) {
@@ -1166,18 +1227,14 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken
c->ritem = ITEM_data(it);
c->rlbytes = it->nbytes;
conn_set_state(c, conn_nread);
- return;
}
static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const int incr) {
char temp[32];
- unsigned int value;
item *it;
unsigned int delta;
char *key;
size_t nkey;
- int res;
- char *ptr;
assert(c != NULL);
@@ -1202,12 +1259,6 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt
}
}
- it = get_item(key, nkey);
- if (!it) {
- out_string(c, "NOT_FOUND");
- return;
- }
-
delta = strtoul(tokens[2].value, NULL, 10);
if(errno == ERANGE) {
@@ -1215,14 +1266,38 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt
return;
}
+ it = item_get(key, nkey);
+ if (!it) {
+ out_string(c, "NOT_FOUND");
+ return;
+ }
+
+ out_string(c, add_delta(it, incr, delta, temp));
+ item_remove(it); /* release our reference */
+}
+
+/*
+ * adds a delta value to a numeric item.
+ *
+ * it item to adjust
+ * incr true to increment value, false to decrement
+ * delta amount to adjust value by
+ * buf buffer for response string
+ *
+ * returns a response string to send back to the client.
+ */
+char *do_add_delta(item *it, int incr, unsigned int delta, char *buf) {
+ char *ptr;
+ unsigned int value;
+ int res;
+
ptr = ITEM_data(it);
while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++; // BUG: can't be true
value = strtol(ptr, NULL, 10);
if(errno == ERANGE) {
- out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
- return;
+ return "CLIENT_ERROR cannot increment or decrement non-numeric value";
}
if (incr != 0)
@@ -1231,24 +1306,24 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt
if (delta >= value) value = 0;
else value -= delta;
}
- snprintf(temp, 32, "%u", value);
- res = strlen(temp);
+ snprintf(buf, 32, "%u", value);
+ res = strlen(buf);
if (res + 2 > it->nbytes) { /* need to realloc */
item *new_it;
- new_it = item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
+ new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
if (new_it == 0) {
- out_string(c, "SERVER_ERROR out of memory");
- return;
+ return "SERVER_ERROR out of memory";
}
- memcpy(ITEM_data(new_it), temp, res);
+ memcpy(ITEM_data(new_it), buf, res);
memcpy(ITEM_data(new_it) + res, "\r\n", 3);
- item_replace(it, new_it);
+ do_item_replace(it, new_it);
+ do_item_remove(new_it); /* release our reference */
} else { /* replace in-place */
- memcpy(ITEM_data(it), temp, res);
+ memcpy(ITEM_data(it), buf, res);
memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
}
- out_string(c, temp);
- return;
+
+ return buf;
}
static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
@@ -1289,17 +1364,32 @@ static void process_delete_command(conn *c, token_t *tokens, const size_t ntoken
}
}
- it = get_item(key, nkey);
- if (!it) {
- out_string(c, "NOT_FOUND");
- return;
+ if (settings.detail_enabled) {
+ stats_prefix_record_delete(key);
}
- if (exptime == 0) {
- item_unlink(it);
- out_string(c, "DELETED");
- return;
+ it = item_get(key, nkey);
+ if (it) {
+ if (exptime == 0) {
+ item_unlink(it);
+ item_remove(it); /* release our reference */
+ out_string(c, "DELETED");
+ } else {
+ /* our reference will be transfered to the delete queue */
+ out_string(c, defer_delete(it, exptime));
+ }
+ } else {
+ out_string(c, "NOT_FOUND");
}
+}
+
+/*
+ * Adds an item to the deferred-delete list so it can be reaped later.
+ *
+ * Returns the result to send to the client.
+ */
+char *do_defer_delete(item *it, time_t exptime)
+{
if (delcurr >= deltotal) {
item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
if (new_delete) {
@@ -1310,18 +1400,17 @@ static void process_delete_command(conn *c, token_t *tokens, const size_t ntoken
* can't delete it immediately, user wants a delay,
* but we ran out of memory for the delete queue
*/
- out_string(c, "SERVER_ERROR out of memory");
- return;
+ item_remove(it); /* release reference */
+ return "SERVER_ERROR out of memory";
}
}
- it->refcount++;
/* use its expiration time as its deletion time now */
it->exptime = realtime(exptime);
it->it_flags |= ITEM_DELETED;
todelete[delcurr++] = it;
- out_string(c, "DELETED");
- return;
+
+ return "DELETED";
}
static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
@@ -1565,7 +1654,9 @@ static int try_read_udp(conn *c) {
0, &c->request_addr, &c->request_addr_size);
if (res > 8) {
unsigned char *buf = (unsigned char *)c->rbuf;
+ STATS_LOCK();
stats.bytes_read += res;
+ STATS_UNLOCK();
/* Beginning of UDP packet is the request ID; save it. */
c->request_id = buf[0] * 256 + buf[1];
@@ -1632,7 +1723,9 @@ static int try_read_network(conn *c) {
res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
if (res > 0) {
+ STATS_LOCK();
stats.bytes_read += res;
+ STATS_UNLOCK();
gotdata = 1;
c->rbytes += res;
continue;
@@ -1653,10 +1746,12 @@ static int try_read_network(conn *c) {
static bool update_event(conn *c, const int new_flags) {
assert(c != NULL);
+ struct event_base *base = c->event.ev_base;
if (c->ev_flags == new_flags)
return true;
if (event_del(&c->event) == -1) return false;
event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
+ event_base_set(base, &c->event);
c->ev_flags = new_flags;
if (event_add(&c->event, 0) == -1) return false;
return true;
@@ -1666,6 +1761,8 @@ static bool update_event(conn *c, const int new_flags) {
* Sets whether we are listening for new connections or not.
*/
void accept_new_conns(const bool do_accept) {
+ if (! is_listen_thread())
+ return;
if (do_accept) {
update_event(listen_conn, EV_READ | EV_PERSIST);
if (listen(listen_conn->sfd, 1024) != 0) {
@@ -1704,7 +1801,9 @@ static int transmit(conn *c) {
res = sendmsg(c->sfd, m, 0);
if (res > 0) {
+ STATS_LOCK();
stats.bytes_written += res;
+ STATS_UNLOCK();
/* We've written some of the data. Remove the completed
iovec entries from the list of pending writes. */
@@ -1762,14 +1861,16 @@ static void drive_machine(conn *c) {
addrlen = sizeof(addr);
if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ /* these are transient, so don't log anything */
stop = true;
- break;
} else if (errno == EMFILE) {
if (settings.verbose > 0)
fprintf(stderr, "Too many open connections\n");
accept_new_conns(false);
+ stop = true;
} else {
perror("accept()");
+ stop = true;
}
break;
}
@@ -1779,14 +1880,8 @@ static void drive_machine(conn *c) {
close(sfd);
break;
}
- if (conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
- DATA_BUFFER_SIZE, false) == NULL) {
- if (settings.verbose > 0)
- fprintf(stderr, "couldn't create new connection\n");
- close(sfd);
- break;
- }
-
+ dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
+ DATA_BUFFER_SIZE, false);
break;
case conn_read:
@@ -1826,7 +1921,9 @@ static void drive_machine(conn *c) {
/* now try reading from the socket */
res = read(c->sfd, c->ritem, c->rlbytes);
if (res > 0) {
+ STATS_LOCK();
stats.bytes_read += res;
+ STATS_UNLOCK();
c->ritem += res;
c->rlbytes -= res;
break;
@@ -1870,7 +1967,9 @@ static void drive_machine(conn *c) {
/* now try reading from the socket */
res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
if (res > 0) {
+ STATS_LOCK();
stats.bytes_read += res;
+ STATS_UNLOCK();
c->sbytes -= res;
break;
}
@@ -1955,7 +2054,6 @@ static void drive_machine(conn *c) {
stop = true;
break;
}
-
}
return;
@@ -1977,7 +2075,6 @@ void event_handler(const int fd, const short which, void *arg) {
return;
}
- /* do as much I/O as possible until we block */
drive_machine(c);
/* wait for next event */
@@ -2188,6 +2285,7 @@ static void clock_handler(const int fd, const short which, void *arg) {
}
evtimer_set(&clockevent, clock_handler, 0);
+ event_base_set(main_base, &clockevent);
evtimer_add(&clockevent, &t);
set_current_time();
@@ -2208,22 +2306,28 @@ static void delete_handler(const int fd, const short which, void *arg) {
}
evtimer_set(&deleteevent, delete_handler, 0);
+ event_base_set(main_base, &deleteevent);
evtimer_add(&deleteevent, &t);
- {
- int i=0, j=0;
- for (i=0; i<delcurr; i++) {
- item *it = todelete[i];
- if (item_delete_lock_over(it)) {
- assert(it->refcount > 0);
- it->it_flags &= ~ITEM_DELETED;
- item_unlink(it);
- item_remove(it);
- } else {
- todelete[j++] = it;
- }
+ run_deferred_deletes();
+}
+
+/* Call run_deferred_deletes instead of this. */
+void do_run_deferred_deletes(void)
+{
+ int i, j = 0;
+
+ for (i = 0; i < delcurr; i++) {
+ item *it = todelete[i];
+ if (item_delete_lock_over(it)) {
+ assert(it->refcount > 0);
+ it->it_flags &= ~ITEM_DELETED;
+ do_item_unlink(it);
+ do_item_remove(it);
+ } else {
+ todelete[j++] = it;
}
- delcurr = j;
}
+ delcurr = j;
}
static void usage(void) {
@@ -2247,6 +2351,9 @@ static void usage(void) {
"-P <file> save PID in <file>, only used with -d option\n"
"-f <factor> chunk size growth factor, default 1.25\n"
"-n <bytes> minimum space allocated for key+value+flags, default 48\n");
+#ifdef USE_THREADS
+ printf("-t <num> number of threads to use, default 4\n");
+#endif
return;
}
@@ -2356,7 +2463,6 @@ static void sig_handler(const int sig) {
int main (int argc, char **argv) {
int c;
- conn *u_conn;
struct in_addr addr;
bool lock_memory = false;
bool daemonize = false;
@@ -2377,7 +2483,7 @@ int main (int argc, char **argv) {
setbuf(stderr, NULL);
/* process arguments */
- while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:n:")) != -1) {
+ while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) {
switch (c) {
case 'U':
settings.udpport = atoi(optarg);
@@ -2446,6 +2552,21 @@ int main (int argc, char **argv) {
return 1;
}
break;
+ case 't':
+ settings.num_threads = atoi(optarg);
+ if (settings.num_threads == 0) {
+ fprintf(stderr, "Number of threads must be greater than 0\n");
+ return 1;
+ }
+ break;
+ case 'D':
+ if (! optarg || ! optarg[0]) {
+ fprintf(stderr, "No delimiter specified\n");
+ return 1;
+ }
+ settings.prefix_delimiter = optarg[0];
+ settings.detail_enabled = 1;
+ break;
default:
fprintf(stderr, "Illegal argument \"%c\"\n", c);
return 1;
@@ -2559,10 +2680,11 @@ int main (int argc, char **argv) {
}
}
+ /* initialize main thread libevent instance */
+ main_base = event_init();
/* initialize other stuff */
item_init();
- event_init();
stats_init();
assoc_init();
conn_init();
@@ -2599,16 +2721,16 @@ int main (int argc, char **argv) {
exit(EXIT_FAILURE);
}
/* create the initial listening connection */
- if (!(listen_conn = conn_new(l_socket, conn_listening, EV_READ | EV_PERSIST, 1, false))) {
+ if (!(listen_conn = conn_new(l_socket, conn_listening,
+ EV_READ | EV_PERSIST, 1, false, main_base))) {
fprintf(stderr, "failed to create listening connection");
exit(EXIT_FAILURE);
}
- /* create the initial listening udp connection */
- if (u_socket > -1 &&
- !(u_conn = conn_new(u_socket, conn_read, EV_READ | EV_PERSIST, UDP_READ_BUFFER_SIZE, true))) {
- fprintf(stderr, "failed to create udp connection");
- exit(EXIT_FAILURE);
- }
+ /* save the PID in if we're a daemon */
+ if (daemonize)
+ save_pid(getpid(), pid_file);
+ /* start up worker threads if MT mode */
+ thread_init(settings.num_threads, main_base);
/* initialise clock event */
clock_handler(0, 0, 0);
/* initialise deletion array and timer event */
@@ -2616,11 +2738,16 @@ int main (int argc, char **argv) {
delcurr = 0;
todelete = malloc(sizeof(item *) * deltotal);
delete_handler(0, 0, 0); /* sets up the event */
- /* save the PID in if we're a daemon */
- if (daemonize)
- save_pid(getpid(), pid_file);
- /* enter the loop */
- event_loop(0);
+ /* create the initial listening udp connection, monitored on all threads */
+ if (u_socket > -1) {
+ for (c = 0; c < settings.num_threads; c++) {
+ /* this is guaranteed to hit all threads because we round-robin */
+ dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST,
+ UDP_READ_BUFFER_SIZE, 1);
+ }
+ }
+ /* enter the event loop */
+ event_base_loop(main_base, 0);
/* remove the PID file if we're a daemon */
if (daemonize)
remove_pidfile(pid_file);
diff --git a/memcached.h b/memcached.h
index 2990f66..852e52e 100644
--- a/memcached.h
+++ b/memcached.h
@@ -1,5 +1,11 @@
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/* $Id$ */
+#include "config.h"
+#include <sys/types.h>
+#include <sys/time.h>
+#include <netinet/in.h>
+#include <event.h>
+
#define DATA_BUFFER_SIZE 2048
#define UDP_READ_BUFFER_SIZE 65536
#define UDP_MAX_PAYLOAD_SIZE 1400
@@ -69,6 +75,9 @@ struct settings {
char *socketpath; /* path to unix socket if using local socket */
double factor; /* chunk size growth factor */
int chunk_size;
+ int num_threads; /* number of libevent threads to run */
+ char prefix_delimiter; /* character that marks a key prefix (for stats) */
+ int detail_enabled; /* nonzero if we're collecting detailed stats */
};
extern struct stats stats;
@@ -199,6 +208,120 @@ extern volatile rel_time_t current_time;
* Functions
*/
+conn *do_conn_from_freelist();
+int do_conn_add_to_freelist(conn *c);
+char *do_defer_delete(item *item, time_t exptime);
+void do_run_deferred_deletes(void);
+char *do_add_delta(item *item, int incr, unsigned int delta, char *buf);
+int do_store_item(item *item, int comm);
+conn *conn_new(const int sfd, const int init_state, const int event_flags, const int read_buffer_size, const bool is_udp, struct event_base *base);
+
+
+#include "stats.h"
#include "slabs.h"
#include "assoc.h"
#include "items.h"
+
+
+/*
+ * In multithreaded mode, we wrap certain functions with lock management and
+ * replace the logic of some other functions. All wrapped functions have
+ * "mt_" and "do_" variants. In multithreaded mode, the plain version of a
+ * function is #define-d to the "mt_" variant, which often just grabs a
+ * lock and calls the "do_" function. In singlethreaded mode, the "do_"
+ * function is called directly.
+ *
+ * Functions such as the libevent-related calls that need to do cross-thread
+ * communication in multithreaded mode (rather than actually doing the work
+ * in the current thread) are called via "dispatch_" frontends, which are
+ * also #define-d to directly call the underlying code in singlethreaded mode.
+ */
+#ifdef USE_THREADS
+
+void thread_init(int nthreads, struct event_base *main_base);
+int dispatch_event_add(int thread, conn *c);
+void dispatch_conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp);
+
+/* Lock wrappers for cache functions that are called from main loop. */
+char *mt_add_delta(item *item, int incr, unsigned int delta, char *buf);
+conn *mt_conn_from_freelist(void);
+int mt_conn_add_to_freelist(conn *c);
+char *mt_defer_delete(item *it, time_t exptime);
+int mt_is_listen_thread(void);
+item *mt_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
+void mt_item_flush_expired(void);
+item *mt_item_get_notedeleted(char *key, size_t nkey, int *delete_locked);
+item *mt_item_get_nocheck(char *key, size_t nkey);
+int mt_item_link(item *it);
+void mt_item_remove(item *it);
+int mt_item_replace(item *it, item *new_it);
+void mt_item_unlink(item *it);
+void mt_item_update(item *it);
+void mt_run_deferred_deletes(void);
+void *mt_slabs_alloc(size_t size);
+void mt_slabs_free(void *ptr, size_t size);
+int mt_slabs_reassign(unsigned char srcid, unsigned char dstid);
+char *mt_slabs_stats(int *buflen);
+void mt_stats_lock(void);
+void mt_stats_unlock(void);
+int mt_store_item(item *item, int comm);
+
+
+# define add_delta(x,y,z,a) mt_add_delta(x,y,z,a)
+# define assoc_move_next_bucket() mt_assoc_move_next_bucket()
+# define conn_from_freelist() mt_conn_from_freelist()
+# define conn_add_to_freelist(x) mt_conn_add_to_freelist(x)
+# define defer_delete(x,y) mt_defer_delete(x,y)
+# define is_listen_thread() mt_is_listen_thread()
+# define item_alloc(x,y,z,a,b) mt_item_alloc(x,y,z,a,b)
+# define item_flush_expired() mt_item_flush_expired()
+# define item_get_nocheck(x,y) mt_item_get_nocheck(x,y)
+# define item_get_notedeleted(x,y,z) mt_item_get_notedeleted(x,y,z)
+# define item_link(x) mt_item_link(x)
+# define item_remove(x) mt_item_remove(x)
+# define item_replace(x,y) mt_item_replace(x,y)
+# define item_update(x) mt_item_update(x)
+# define item_unlink(x) mt_item_unlink(x)
+# define run_deferred_deletes() mt_run_deferred_deletes()
+# define slabs_alloc(x) mt_slabs_alloc(x)
+# define slabs_free(x,y) mt_slabs_free(x,y)
+# define slabs_reassign(x,y) mt_slabs_reassign(x,y)
+# define slabs_stats(x) mt_slabs_stats(x)
+# define store_item(x,y) mt_store_item(x,y)
+
+# define STATS_LOCK() mt_stats_lock()
+# define STATS_UNLOCK() mt_stats_unlock()
+
+#else /* !USE_THREADS */
+
+# define add_delta(x,y,z,a) do_add_delta(x,y,z,a)
+# define assoc_move_next_bucket() do_assoc_move_next_bucket()
+# define conn_from_freelist() do_conn_from_freelist()
+# define conn_add_to_freelist(x) do_conn_add_to_freelist(x)
+# define defer_delete(x,y) do_defer_delete(x,y)
+# define dispatch_conn_new(x,y,z,a,b) conn_new(x,y,z,a,b,main_base)
+# define dispatch_event_add(t,c) event_add(&(c)->event, 0)
+# define is_listen_thread() 1
+# define item_alloc(x,y,z,a,b) do_item_alloc(x,y,z,a,b)
+# define item_flush_expired() do_item_flush_expired()
+# define item_get_nocheck(x,y) do_item_get_nocheck(x,y)
+# define item_get_notedeleted(x,y,z) do_item_get_notedeleted(x,y,z)
+# define item_link(x) do_item_link(x)
+# define item_remove(x) do_item_remove(x)
+# define item_replace(x,y) do_item_replace(x,y)
+# define item_unlink(x) do_item_unlink(x)
+# define item_update(x) do_item_update(x)
+# define run_deferred_deletes() do_run_deferred_deletes()
+# define slabs_alloc(x) do_slabs_alloc(x)
+# define slabs_free(x,y) do_slabs_free(x,y)
+# define slabs_reassign(x,y) do_slabs_reassign(x,y)
+# define slabs_stats(x) do_slabs_stats(x)
+# define store_item(x,y) do_store_item(x,y)
+# define thread_init(x,y) 0
+
+# define STATS_LOCK() /**/
+# define STATS_UNLOCK() /**/
+
+#endif /* !USE_THREADS */
+
+
diff --git a/slabs.c b/slabs.c
index 6cf2460..6b33a9e 100644
--- a/slabs.c
+++ b/slabs.c
@@ -9,10 +9,8 @@
*
* $Id$
*/
-#include "config.h"
-#include <sys/types.h>
+#include "memcached.h"
#include <sys/stat.h>
-#include <sys/time.h>
#include <sys/socket.h>
#include <sys/signal.h>
#include <sys/resource.h>
@@ -21,18 +19,15 @@
#include <stdio.h>
#include <string.h>
#include <unistd.h>
-#include <netinet/in.h>
#include <errno.h>
-#include <event.h>
#include <assert.h>
#include <stdbool.h>
-#include "memcached.h"
-
#define POWER_SMALLEST 1
#define POWER_LARGEST 200
#define POWER_BLOCK 1048576
#define CHUNK_ALIGN_BYTES (sizeof(void *))
+#define DONT_PREALLOC_SLABS
/* powers-of-N allocation structures */
@@ -63,7 +58,7 @@ static int power_largest;
/*
* Forward Declarations
*/
-static int slabs_newslab(const unsigned int id);
+static int do_slabs_newslab(const unsigned int id);
#ifndef DONT_PREALLOC_SLABS
/* Preallocate as many slab pages as possible (called from slabs_init)
@@ -161,7 +156,7 @@ static void slabs_preallocate (const unsigned int maxslabs) {
for (i = POWER_SMALLEST; i <= POWER_LARGEST; i++) {
if (++prealloc > maxslabs)
return;
- slabs_newslab(i);
+ do_slabs_newslab(i);
}
}
@@ -179,7 +174,7 @@ static int grow_slab_list (const unsigned int id) {
return 1;
}
-static int slabs_newslab(const unsigned int id) {
+static int do_slabs_newslab(const unsigned int id) {
slabclass_t *p = &slabclass[id];
#ifdef ALLOW_SLABS_REASSIGN
int len = POWER_BLOCK;
@@ -206,7 +201,7 @@ static int slabs_newslab(const unsigned int id) {
}
/*@null@*/
-void *slabs_alloc(const size_t size) {
+void *do_slabs_alloc(const size_t size) {
slabclass_t *p;
unsigned int id = slabs_clsid(size);
@@ -225,7 +220,7 @@ void *slabs_alloc(const size_t size) {
/* fail unless we have space at the end of a recently allocated page,
we have something on our freelist, or we could allocate a new page */
- if (! (p->end_page_ptr != 0 || p->sl_curr != 0 || slabs_newslab(id) != 0))
+ if (! (p->end_page_ptr != 0 || p->sl_curr != 0 || do_slabs_newslab(id) != 0))
return 0;
/* return off our freelist, if we have one */
@@ -246,7 +241,7 @@ void *slabs_alloc(const size_t size) {
return NULL; /* shouldn't ever get here */
}
-void slabs_free(void *ptr, const size_t size) {
+void do_slabs_free(void *ptr, const size_t size) {
unsigned char id = slabs_clsid(size);
slabclass_t *p;
@@ -276,7 +271,7 @@ void slabs_free(void *ptr, const size_t size) {
}
/*@null@*/
-char* slabs_stats(int *buflen) {
+char* do_slabs_stats(int *buflen) {
int i, total;
char *buf = (char *)malloc(power_largest * 200 + 100);
char *bufcurr = buf;
@@ -318,7 +313,7 @@ char* slabs_stats(int *buflen) {
1 = success
0 = fail
-1 = tried. busy. send again shortly. */
-int slabs_reassign(unsigned char srcid, unsigned char dstid) {
+int do_slabs_reassign(unsigned char srcid, unsigned char dstid) {
void *slab, *slab_end;
slabclass_t *p, *dp;
void *iter;
diff --git a/slabs.h b/slabs.h
index 3ab538b..5d0c07a 100644
--- a/slabs.h
+++ b/slabs.h
@@ -14,17 +14,17 @@ void slabs_init(const size_t limit, const double factor);
unsigned int slabs_clsid(const size_t size);
/* Allocate object of given length. 0 on error */ /*@null@*/
-void *slabs_alloc(const size_t size);
+void *do_slabs_alloc(const size_t size);
/* Free previously allocated object */
-void slabs_free(void *ptr, size_t size);
+void do_slabs_free(void *ptr, size_t size);
/* Fill buffer with stats */ /*@null@*/
-char* slabs_stats(int *buflen);
+char* do_slabs_stats(int *buflen);
/* Request some slab be moved between classes
1 = success
0 = fail
-1 = tried. busy. send again shortly. */
-int slabs_reassign(unsigned char srcid, unsigned char dstid);
+int do_slabs_reassign(unsigned char srcid, unsigned char dstid);
diff --git a/t/stats.t b/t/stats.t
index f060fd2..1d8df8b 100755
--- a/t/stats.t
+++ b/t/stats.t
@@ -37,7 +37,7 @@ my $sock = $server->sock;
my $stats = mem_stats($sock);
# Test number of keys
-is(scalar(keys(%$stats)), 21, "21 stats values");
+is(scalar(keys(%$stats)), 22, "22 stats values");
# Test initial state
foreach my $key (qw(curr_items total_items bytes cmd_get cmd_set get_hits evictions get_misses bytes_written)) {