diff options
author | Steven Grimm <sgrimm@facebook.com> | 2007-04-16 15:34:03 +0000 |
---|---|---|
committer | Steven Grimm <sgrimm@facebook.com> | 2007-04-16 15:34:03 +0000 |
commit | 56b8339e0606c1e59987c8d6368dfe727f3914b8 (patch) | |
tree | 6a3b5a81e147e26321f88ff679d345cb3cea025f | |
parent | 5b304997cc5e19cc9eb6856b70716574492c2989 (diff) | |
download | memcached-56b8339e0606c1e59987c8d6368dfe727f3914b8.tar.gz |
Merge multithreaded into trunk, commit #2 (first commit only did the
new files, not the modified ones.)
git-svn-id: http://code.sixapart.com/svn/memcached/trunk/server@509 b0b603af-a30f-0410-a34e-baf09ae79d0b
-rw-r--r-- | ChangeLog | 21 | ||||
-rw-r--r-- | Makefile.am | 2 | ||||
-rw-r--r-- | assoc.c | 43 | ||||
-rw-r--r-- | assoc.h | 3 | ||||
-rw-r--r-- | configure.ac | 10 | ||||
-rw-r--r-- | doc/memcached.1 | 12 | ||||
-rw-r--r-- | items.c | 119 | ||||
-rw-r--r-- | items.h | 18 | ||||
-rw-r--r-- | memcached.c | 497 | ||||
-rw-r--r-- | memcached.h | 123 | ||||
-rw-r--r-- | slabs.c | 25 | ||||
-rw-r--r-- | slabs.h | 8 | ||||
-rwxr-xr-x | t/stats.t | 2 |
13 files changed, 622 insertions, 261 deletions
@@ -69,6 +69,17 @@ * Explicitly compare against NULL or zero in many places. +2007-03-05 + * Steven Grimm <sgrimm@facebook.com>: Per-object-type stats collection + support. Specify the object type delimiter with the -D command line + option. Turn stats gathering on and off with "stats detail on" and + "stats detail off". Dump the per-object-type details with + "stats detail dump". + +2007-03-01 + * Steven Grimm <sgrimm@facebook.com>: Fix an off-by-one error in the + multithreaded version's message passing code. + 2006-12-23 * fix expirations of items set with absolute expiration times in the past, before the server's start time. bug was introduced in @@ -97,10 +108,20 @@ * Steve Peters <steve@fisharerojo.org>: OpenBSD has a malloc.h, but warns to use stdlib.h instead +2006-11-22 + * Steven Grimm <sgrimm@facebook.com>: Add support for multithreaded + execution. Run configure with "--enable-threads" to enable. See + doc/threads.txt for details. + 2006-11-13 * Iain Wade <iwade@optusnet.com.au>: Fix for UDP responses on non-"get" commands. +2006-10-15 + * Steven Grimm <sgrimm@facebook.com>: Dynamic sizing of hashtable to + reduce collisions on very large caches and conserve memory on + small caches. + 2006-10-13 * Steven Grimm <sgrimm@facebook.com>: New faster hash function. diff --git a/Makefile.am b/Makefile.am index 98724eb..11c006c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,6 +1,6 @@ bin_PROGRAMS = memcached memcached-debug -memcached_SOURCES = memcached.c slabs.c slabs.h items.c items.h assoc.c assoc.h memcached.h +memcached_SOURCES = memcached.c slabs.c slabs.h items.c items.h assoc.c assoc.h memcached.h thread.c stats.c stats.h memcached_debug_SOURCES = $(memcached_SOURCES) memcached_CPPFLAGS = -DNDEBUG memcached_LDADD = @LIBOBJS@ @@ -12,10 +12,9 @@ * * $Id$ */ -#include "config.h" -#include <sys/types.h> + +#include "memcached.h" #include <sys/stat.h> -#include <sys/time.h> #include <sys/socket.h> #include <sys/signal.h> #include <sys/resource.h> @@ -24,13 +23,9 @@ #include <stdio.h> #include <string.h> #include <unistd.h> -#include <netinet/in.h> #include <errno.h> -#include <event.h> #include <assert.h> -#include "memcached.h" - /* * Since the hash function does bit manipulation, it needs to know * whether it's big or little-endian. ENDIAN_LITTLE and ENDIAN_BIG @@ -142,7 +137,7 @@ and these came close: } #if HASH_LITTLE_ENDIAN == 1 -static uint32_t hash( +uint32_t hash( const void *key, /* the key to hash */ size_t length, /* length of the key */ const uint32_t initval) /* initval */ @@ -323,7 +318,7 @@ static uint32_t hash( * from hashlittle() on all machines. hashbig() takes advantage of * big-endian byte ordering. */ -static uint32_t hash( const void *key, size_t length, const uint32_t initval) +uint32_t hash( const void *key, size_t length, const uint32_t initval) { uint32_t a,b,c; union { const void *ptr; size_t i; } u; /* to cast key to (size_t) happily */ @@ -541,39 +536,41 @@ static void assoc_expand(void) { primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *)); if (primary_hashtable) { - if (settings.verbose > 1) - fprintf(stderr, "Hash table expansion starting\n"); + if (settings.verbose > 1) + fprintf(stderr, "Hash table expansion starting\n"); hashpower++; expanding = 1; expand_bucket = 0; - assoc_move_next_bucket(); + do_assoc_move_next_bucket(); } else { primary_hashtable = old_hashtable; - /* Bad news, but we can keep running. */ + /* Bad news, but we can keep running. */ } } /* migrates the next bucket to the primary hashtable if we're expanding. */ -void assoc_move_next_bucket(void) { +void do_assoc_move_next_bucket(void) { item *it, *next; int bucket; if (expanding) { for (it = old_hashtable[expand_bucket]; NULL != it; it = next) { - next = it->h_next; + next = it->h_next; bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower); it->h_next = primary_hashtable[bucket]; primary_hashtable[bucket] = it; - } + } - expand_bucket++; - if (expand_bucket == hashsize(hashpower - 1)) { - expanding = 0; - free(old_hashtable); - if (settings.verbose > 1) - fprintf(stderr, "Hash table expansion done\n"); - } + old_hashtable[expand_bucket] = NULL; + + expand_bucket++; + if (expand_bucket == hashsize(hashpower - 1)) { + expanding = 0; + free(old_hashtable); + if (settings.verbose > 1) + fprintf(stderr, "Hash table expansion done\n"); + } } } @@ -3,4 +3,5 @@ void assoc_init(void); item *assoc_find(const char *key, const size_t nkey); int assoc_insert(item *item); void assoc_delete(const char *key, const size_t nkey); -void assoc_move_next_bucket(void); +void do_assoc_move_next_bucket(void); +uint32_t hash( const void *key, size_t length, const uint32_t initval); diff --git a/configure.ac b/configure.ac index 19caccb..f45f7cf 100644 --- a/configure.ac +++ b/configure.ac @@ -94,6 +94,7 @@ dnl ---------------------------------------------------------------------------- AC_SEARCH_LIBS(socket, socket) AC_SEARCH_LIBS(gethostbyname, nsl) AC_SEARCH_LIBS(mallinfo, malloc) +AC_SEARCH_LIBS(pthread_create, pthread) AC_CHECK_FUNC(daemon,AC_DEFINE([HAVE_DAEMON],,[Define this if you have daemon()]),[AC_LIBOBJ(daemon)]) @@ -156,6 +157,15 @@ fi AC_C_ENDIAN +dnl Check whether the user wants threads or not +AC_ARG_ENABLE(threads, + [AS_HELP_STRING([--enable-threads],[support multithreaded execution])], + [if test "$ac_cv_search_pthread_create" != "no"; then + AC_DEFINE([USE_THREADS],,[Define this if you want to use pthreads]) + else + AC_MSG_ERROR([Can't enable threads without the POSIX thread library.]) + fi]) + AC_CHECK_FUNCS(mlockall) AC_CONFIG_FILES(Makefile doc/Makefile) diff --git a/doc/memcached.1 b/doc/memcached.1 index dbd1c8e..fee6975 100644 --- a/doc/memcached.1 +++ b/doc/memcached.1 @@ -84,6 +84,18 @@ Print memcached and libevent licenses. .TP .B \-P <filename> Print pidfile to <filename>, only used under -d option. +.TP +.B \-t <threads> +Number of threads to use to process incoming requests. This option is only +meaningful if memcached was compiled with thread support enabled. It is +typically not useful to set this higher than the number of CPU cores on the +memcached server. +.TP +.B \-D <char> +Use <char> as the delimiter between key prefixes and IDs. This is used for +per-prefix stats reporting. The default is ":" (colon). If this option is +specified, stats collection is turned on automatically; if not, then it may +be turned on by sending the "stats detail on" command to the server. .br .SH LICENSE The memcached daemon is copyright Danga Interactive and is distributed under @@ -1,8 +1,7 @@ /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* $Id$ */ -#include <sys/types.h> +#include "memcached.h" #include <sys/stat.h> -#include <sys/time.h> #include <sys/socket.h> #include <sys/signal.h> #include <sys/resource.h> @@ -11,14 +10,10 @@ #include <stdio.h> #include <string.h> #include <unistd.h> -#include <netinet/in.h> #include <errno.h> #include <time.h> -#include <event.h> #include <assert.h> -#include "memcached.h" - /* Forward Declarations */ static void item_link_q(item *it); static void item_unlink_q(item *it); @@ -44,6 +39,17 @@ void item_init(void) { } } +/* Enable this for reference-count debugging. */ +#if 0 +# define DEBUG_REFCNT(it,op) \ + fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \ + it, op, it->refcount, \ + (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \ + (it->it_flags & ITEM_SLABBED) ? 'S' : ' ', \ + (it->it_flags & ITEM_DELETED) ? 'D' : ' ') +#else +# define DEBUG_REFCNT(it,op) while(0) +#endif /* * Generates the variable-sized part of the header for an object. @@ -65,7 +71,7 @@ static size_t item_make_header(const uint8_t nkey, const int flags, const int nb } /*@null@*/ -item *item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes) { +item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes) { uint8_t nsuffix; item *it; char suffix[40]; @@ -98,9 +104,12 @@ item *item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t for (search = tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) { if (search->refcount == 0) { - if (search->exptime > current_time) + if (search->exptime > current_time) { + STATS_LOCK(); stats.evictions++; - item_unlink(search); + STATS_UNLOCK(); + } + do_item_unlink(search); break; } } @@ -115,7 +124,8 @@ item *item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t assert(it != heads[it->slabs_clsid]); it->next = it->prev = it->h_next = 0; - it->refcount = 0; + it->refcount = 1; /* the caller will have a reference */ + DEBUG_REFCNT(it, '*'); it->it_flags = 0; it->nkey = nkey; it->nbytes = nbytes; @@ -136,6 +146,7 @@ void item_free(item *it) { /* so slab size changer can tell later if item is already free or not */ it->slabs_clsid = 0; it->it_flags |= ITEM_SLABBED; + DEBUG_REFCNT(it, 'F'); slabs_free(it, ntotal); } @@ -192,57 +203,66 @@ static void item_unlink_q(item *it) { return; } -int item_link(item *it) { +int do_item_link(item *it) { assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0); assert(it->nbytes < 1048576); it->it_flags |= ITEM_LINKED; it->time = current_time; assoc_insert(it); + STATS_LOCK(); stats.curr_bytes += ITEM_ntotal(it); stats.curr_items += 1; stats.total_items += 1; + STATS_UNLOCK(); item_link_q(it); return 1; } -void item_unlink(item *it) { +void do_item_unlink(item *it) { if ((it->it_flags & ITEM_LINKED) != 0) { it->it_flags &= ~ITEM_LINKED; + STATS_LOCK(); stats.curr_bytes -= ITEM_ntotal(it); stats.curr_items -= 1; + STATS_UNLOCK(); assoc_delete(ITEM_key(it), it->nkey); item_unlink_q(it); + if (it->refcount == 0) item_free(it); } - if (it->refcount == 0) item_free(it); } -void item_remove(item *it) { +void do_item_remove(item *it) { assert((it->it_flags & ITEM_SLABBED) == 0); - if (it->refcount != 0) it->refcount--; + if (it->refcount != 0) { + it->refcount--; + DEBUG_REFCNT(it, '-'); + } assert((it->it_flags & ITEM_DELETED) == 0 || it->refcount != 0); if (it->refcount == 0 && (it->it_flags & ITEM_LINKED) == 0) { item_free(it); } } -void item_update(item *it) { +void do_item_update(item *it) { if (it->time < current_time - ITEM_UPDATE_INTERVAL) { assert((it->it_flags & ITEM_SLABBED) == 0); - item_unlink_q(it); - it->time = current_time; - item_link_q(it); + if (it->it_flags & ITEM_LINKED) { + item_unlink_q(it); + it->time = current_time; + item_link_q(it); + } } } -int item_replace(item *it, item *new_it) { +int do_item_replace(item *it, item *new_it) { assert((it->it_flags & ITEM_SLABBED) == 0); - item_unlink(it); - return item_link(new_it); + do_item_unlink(it); + return do_item_link(new_it); } /*@null@*/ @@ -337,8 +357,59 @@ char* item_stats_sizes(int *bytes) { return buf; } +/* returns true if a deleted item's delete-locked-time is over, and it + should be removed from the namespace */ +int item_delete_lock_over (item *it) { + assert(it->it_flags & ITEM_DELETED); + return (current_time >= it->exptime); +} + +/* wrapper around assoc_find which does the lazy expiration/deletion logic */ +item *do_item_get_notedeleted(char *key, size_t nkey, int *delete_locked) { + item *it = assoc_find(key, nkey); + if (delete_locked) *delete_locked = 0; + if (it && (it->it_flags & ITEM_DELETED)) { + /* it's flagged as delete-locked. let's see if that condition + is past due, and the 5-second delete_timer just hasn't + gotten to it yet... */ + if (! item_delete_lock_over(it)) { + if (delete_locked) *delete_locked = 1; + it = 0; + } + } + if (it && settings.oldest_live && settings.oldest_live <= current_time && + it->time <= settings.oldest_live) { + do_item_unlink(it); // MTSAFE - cache_lock held + it = 0; + } + if (it && it->exptime && it->exptime <= current_time) { + do_item_unlink(it); // MTSAFE - cache_lock held + it = 0; + } + + if (it) { + it->refcount++; + DEBUG_REFCNT(it, '+'); + } + return it; +} + +item *item_get(char *key, size_t nkey) { + return item_get_notedeleted(key, nkey, 0); +} + +/* returns an item whether or not it's delete-locked or expired. */ +item *do_item_get_nocheck(char *key, size_t nkey) { + item *it = assoc_find(key, nkey); + if (it) { + it->refcount++; + DEBUG_REFCNT(it, '+'); + } + return it; +} + /* expires items that are more recent than the oldest_live setting. */ -void item_flush_expired(void) { +void do_item_flush_expired(void) { int i; item *iter, *next; if (settings.oldest_live == 0) @@ -353,7 +424,7 @@ void item_flush_expired(void) { if (iter->time >= settings.oldest_live) { next = iter->next; if ((iter->it_flags & ITEM_SLABBED) == 0) { - item_unlink(iter); + do_item_unlink(iter); } } else { /* We've hit the first old item. Continue to the next queue. */ @@ -1,15 +1,15 @@ /* See items.c */ void item_init(void); /*@null@*/ -item *item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes); +item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes); void item_free(item *it); bool item_size_ok(const size_t nkey, const int flags, const int nbytes); -int item_link(item *it); /* may fail if transgresses limits */ -void item_unlink(item *it); -void item_remove(item *it); -void item_update(item *it); /* update LRU time to current and reposition */ -int item_replace(item *it, item *new_it); +int do_item_link(item *it); /* may fail if transgresses limits */ +void do_item_unlink(item *it); +void do_item_remove(item *it); +void do_item_update(item *it); /* update LRU time to current and reposition */ +int do_item_replace(item *it, item *new_it); /*@null@*/ char *item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes); @@ -17,4 +17,8 @@ void item_stats(char *buffer, const int buflen); /*@null@*/ char *item_stats_sizes(int *bytes); -void item_flush_expired(void); +void do_item_flush_expired(void); +item *item_get(char *key, size_t nkey); + +item *do_item_get_notedeleted(char *key, size_t nkey, int *delete_locked); +item *do_item_get_nocheck(char *key, size_t nkey); diff --git a/memcached.c b/memcached.c index 4ce15c0..0f303ce 100644 --- a/memcached.c +++ b/memcached.c @@ -15,10 +15,8 @@ * * $Id$ */ -#include "config.h" -#include <sys/types.h> +#include "memcached.h" #include <sys/stat.h> -#include <sys/time.h> #include <sys/socket.h> #include <sys/un.h> #include <sys/signal.h> @@ -41,12 +39,10 @@ #include <stdio.h> #include <string.h> #include <unistd.h> -#include <netinet/in.h> #include <netinet/tcp.h> #include <arpa/inet.h> #include <errno.h> #include <time.h> -#include <event.h> #include <assert.h> #include <limits.h> @@ -64,8 +60,6 @@ #endif #endif -#include "memcached.h" - /* * forward declarations */ @@ -85,7 +79,6 @@ static void settings_init(void); /* event handling, network IO */ static void event_handler(const int fd, const short which, void *arg); -static conn *conn_new(const int sfd, const int init_state, const int event_flags, const int read_buffer_size, const bool is_udp); static void conn_close(conn *c); static void conn_init(void); static void accept_new_conns(const bool do_accept); @@ -115,6 +108,7 @@ static item **todelete = 0; static int delcurr; static int deltotal; static conn *listen_conn; +static struct event_base *main_base; #define TRANSMIT_COMPLETE 0 #define TRANSMIT_INCOMPLETE 1 @@ -159,12 +153,16 @@ static void stats_init(void) { like 'settings.oldest_live' which act as booleans as well as values are now false in boolean context... */ stats.started = time(0) - 2; + stats_prefix_init(); } static void stats_reset(void) { + STATS_LOCK(); stats.total_items = stats.total_conns = 0; stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = 0; stats.bytes_read = stats.bytes_written = 0; + stats_prefix_clear(); + STATS_UNLOCK(); } static void settings_init(void) { @@ -180,6 +178,13 @@ static void settings_init(void) { settings.managed = false; settings.factor = 1.25; settings.chunk_size = 48; /* space for a modest key and value */ +#ifdef USE_THREADS + settings.num_threads = 4; +#else + settings.num_threads = 1; +#endif + settings.prefix_delimiter = ':'; + settings.detail_enabled = 0; } /* returns true if a deleted item's delete-locked-time is over, and it @@ -189,37 +194,6 @@ static bool item_delete_lock_over (item *it) { return (current_time >= it->exptime); } -/* wrapper around assoc_find which does the lazy expiration/deletion logic */ -static item *get_item_notedeleted(const char *key, const size_t nkey, int *delete_locked) { - item *it = assoc_find(key, nkey); - - if (delete_locked) *delete_locked = 0; - - if (it != NULL && (it->it_flags & ITEM_DELETED)) { - /* it's flagged as delete-locked. let's see if that condition - is past due, and the 5-second delete_timer just hasn't - gotten to it yet... */ - if (! item_delete_lock_over(it)) { - if (delete_locked) *delete_locked = 1; - it = 0; - } - } - if (it != NULL && settings.oldest_live != 0 && settings.oldest_live <= current_time && - it->time <= settings.oldest_live) { - item_unlink(it); - it = 0; - } - if (it != NULL && it->exptime != 0 && it->exptime <= current_time) { - item_unlink(it); - it = 0; - } - return it; -} - -static item *get_item(const char *key, const size_t nkey) { - return get_item_notedeleted(key, nkey, 0); -} - /* * Adds a message header to a connection. * @@ -260,10 +234,16 @@ static int add_msghdr(conn *c) return 0; } + +/* + * Free list management for connections. + */ + static conn **freeconns; static int freetotal; static int freecurr; + static void conn_init(void) { freetotal = 200; freecurr = 0; @@ -273,15 +253,48 @@ static void conn_init(void) { return; } -/*@null@*/ -static conn *conn_new(const int sfd, const int init_state, const int event_flags, - const int read_buffer_size, const bool is_udp) { +/* + * Returns a connection from the freelist, if any. Should call this using + * conn_from_freelist() for thread safety. + */ +conn *do_conn_from_freelist() { conn *c; - /* do we have a free conn structure from a previous close? */ if (freecurr > 0) { c = freeconns[--freecurr]; - } else { /* allocate a new one */ + } else { + c = NULL; + } + + return c; +} + +/* + * Adds a connection to the freelist. 0 = success. Should call this using + * conn_add_to_freelist() for thread safety. + */ +int do_conn_add_to_freelist(conn *c) { + if (freecurr < freetotal) { + freeconns[freecurr++] = c; + return 0; + } else { + /* try to enlarge free connections array */ + conn **new_freeconns = realloc(freeconns, sizeof(conn *) * freetotal * 2); + if (new_freeconns) { + freetotal *= 2; + freeconns = new_freeconns; + freeconns[freecurr++] = c; + return 0; + } + } + return 1; +} + +conn *conn_new(const int sfd, const int init_state, const int event_flags, + const int read_buffer_size, const bool is_udp, struct event_base *base) { + conn *c = conn_from_freelist(); + + if (NULL == c) { if (!(c = (conn *)malloc(sizeof(conn)))) { perror("malloc()"); return NULL; @@ -317,7 +330,9 @@ static conn *conn_new(const int sfd, const int init_state, const int event_flags return NULL; } + STATS_LOCK(); stats.conn_structs++; + STATS_UNLOCK(); } if (settings.verbose > 1) { @@ -350,26 +365,20 @@ static conn *conn_new(const int sfd, const int init_state, const int event_flags c->gen = 0; event_set(&c->event, sfd, event_flags, event_handler, (void *)c); + event_base_set(base, &c->event); c->ev_flags = event_flags; if (event_add(&c->event, 0) == -1) { - if (freecurr < freetotal) { - freeconns[freecurr++] = c; - } else { - if (c->hdrbuf) - free (c->hdrbuf); - free (c->msglist); - free (c->rbuf); - free (c->wbuf); - free (c->ilist); - free (c->iov); - free (c); + if (conn_add_to_freelist(c)) { + conn_free(c); } return NULL; } + STATS_LOCK(); stats.curr_conns++; stats.total_conns++; + STATS_UNLOCK(); return c; } @@ -378,7 +387,7 @@ static void conn_cleanup(conn *c) { assert(c != NULL); if (c->item) { - item_free(c->item); + item_remove(c->item); c->item = 0; } @@ -397,7 +406,7 @@ static void conn_cleanup(conn *c) { /* * Frees a connection. */ -static void conn_free(conn *c) { +void conn_free(conn *c) { if (c) { if (c->hdrbuf) free(c->hdrbuf); @@ -429,24 +438,13 @@ static void conn_close(conn *c) { conn_cleanup(c); /* if the connection has big buffers, just free it */ - if (c->rsize > READ_BUFFER_HIGHWAT) { + if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) { conn_free(c); - } else if (freecurr < freetotal) { - /* if we have enough space in the free connections array, put the structure there */ - freeconns[freecurr++] = c; - } else { - /* try to enlarge free connections array */ - conn **new_freeconns = realloc((void *)freeconns, sizeof(conn *) * freetotal * 2); - if (new_freeconns) { - freetotal *= 2; - freeconns = new_freeconns; - freeconns[freecurr++] = c; - } else { - conn_free(c); - } } + STATS_LOCK(); stats.curr_conns--; + STATS_UNLOCK(); return; } @@ -689,56 +687,63 @@ static void complete_nread(conn *c) { item *it = c->item; int comm = c->item_comm; - item *old_it; - int delete_locked = 0; - char *key = ITEM_key(it); + STATS_LOCK(); stats.set_cmds++; + STATS_UNLOCK(); if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) { out_string(c, "CLIENT_ERROR bad data chunk"); - goto err; + } else { + if (store_item(it, comm)) { + out_string(c, "STORED"); + } else { + out_string(c, "NOT_STORED"); + } } - old_it = get_item_notedeleted(key, it->nkey, &delete_locked); - - if (old_it != NULL && comm == NREAD_ADD) { - item_update(old_it); /* touches item, promotes to head of LRU */ - out_string(c, "NOT_STORED"); - goto err; - } + item_remove(c->item); /* release the c->item reference */ + c->item = 0; +} - if (old_it == NULL && comm == NREAD_REPLACE) { - out_string(c, "NOT_STORED"); - goto err; - } +/* + * Stores an item in the cache according to the semantics of one of the set + * commands. In threaded mode, this is protected by the cache lock. + * + * Returns true if the item was stored. + */ +int do_store_item(item *it, int comm) { + char *key = ITEM_key(it); + int delete_locked = 0; + item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked); + int stored = 0; - if (delete_locked != 0) { - if (comm == NREAD_REPLACE || comm == NREAD_ADD) { - out_string(c, "NOT_STORED"); - goto err; - } + if (old_it != NULL && comm == NREAD_ADD) { + /* add only adds a nonexistent item, but promote to head of LRU */ + do_item_update(old_it); + } else if (!old_it && comm == NREAD_REPLACE) { + /* replace only replaces an existing value; don't store */ + } else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD)) { + /* replace and add can't override delete locks; don't store */ + } else { + /* "set" commands can override the delete lock + window... in which case we have to find the old hidden item + that's in the namespace/LRU but wasn't returned by + item_get.... because we need to replace it */ + if (delete_locked) + old_it = do_item_get_nocheck(key, it->nkey); + + if (old_it != NULL) + do_item_replace(old_it, it); + else + do_item_link(it); - /* but "set" commands can override the delete lock - window... in which case we have to find the old hidden item - that's in the namespace/LRU but wasn't returned by - get_item.... because we need to replace it (below) */ - old_it = assoc_find(key, it->nkey); + stored = 1; } if (old_it) - item_replace(old_it, it); - else - item_link(it); - - c->item = 0; - out_string(c, "STORED"); - return; - -err: - item_free(it); - c->item = 0; - return; + do_item_remove(old_it); /* release our reference */ + return stored; } typedef struct token_s { @@ -816,6 +821,36 @@ static size_t tokenize_command(char *command, token_t *tokens, const size_t max_ return ntokens; } +inline void process_stats_detail(conn *c, const char *command) { + assert(c != NULL); + + if (strcmp(command, "on") == 0) { + settings.detail_enabled = 1; + out_string(c, "OK"); + } + else if (strcmp(command, "off") == 0) { + settings.detail_enabled = 0; + out_string(c, "OK"); + } + else if (strcmp(command, "dump") == 0) { + int len; + char *stats = stats_prefix_dump(&len); + if (NULL != stats) { + c->write_and_free = stats; + c->wcurr = stats; + c->wbytes = len; + conn_set_state(c, conn_write); + c->write_and_go = conn_read; + } + else { + out_string(c, "SERVER_ERROR"); + } + } + else { + out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump"); + } +} + static void process_stat(conn *c, token_t *tokens, const size_t ntokens) { rel_time_t now = current_time; char *command; @@ -838,6 +873,7 @@ static void process_stat(conn *c, token_t *tokens, const size_t ntokens) { getrusage(RUSAGE_SELF, &usage); + STATS_LOCK(); pos += sprintf(pos, "STAT pid %u\r\n", pid); pos += sprintf(pos, "STAT uptime %u\r\n", now); pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started); @@ -859,7 +895,9 @@ static void process_stat(conn *c, token_t *tokens, const size_t ntokens) { pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read); pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written); pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (unsigned long long)settings.maxbytes); + pos += sprintf(pos, "STAT threads %u\r\n", settings.num_threads); pos += sprintf(pos, "END"); + STATS_UNLOCK(); out_string(c, temp); return; } @@ -989,6 +1027,14 @@ static void process_stat(conn *c, token_t *tokens, const size_t ntokens) { return; } + if (strcmp(subcommand, "detail") == 0) { + if (ntokens < 4) + process_stats_detail(c, ""); /* outputs the error message */ + else + process_stats_detail(c, tokens[2].value); + return; + } + if (strcmp(subcommand, "sizes") == 0) { int bytes = 0; char *buf = item_stats_sizes(&bytes); @@ -1042,8 +1088,13 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens) return; } + STATS_LOCK(); stats.get_cmds++; - it = get_item(key, nkey); + STATS_UNLOCK(); + it = item_get(key, nkey); + if (settings.detail_enabled) { + stats_prefix_record_get(key, NULL != it); + } if (it) { if (i >= c->isize) { item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2); @@ -1069,13 +1120,19 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens) if (settings.verbose > 1) fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it)); + /* item_get() has incremented it->refcount for us */ + STATS_LOCK(); stats.get_hits++; - it->refcount++; + STATS_UNLOCK(); item_update(it); *(c->ilist + i) = it; i++; - } else stats.get_misses++; + } else { + STATS_LOCK(); + stats.get_misses++; + STATS_UNLOCK(); + } key_token++; } @@ -1135,6 +1192,10 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken return; } + if (settings.detail_enabled) { + stats_prefix_record_set(key); + } + if (settings.managed) { int bucket = c->bucket; if (bucket == -1) { @@ -1166,18 +1227,14 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken c->ritem = ITEM_data(it); c->rlbytes = it->nbytes; conn_set_state(c, conn_nread); - return; } static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const int incr) { char temp[32]; - unsigned int value; item *it; unsigned int delta; char *key; size_t nkey; - int res; - char *ptr; assert(c != NULL); @@ -1202,12 +1259,6 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt } } - it = get_item(key, nkey); - if (!it) { - out_string(c, "NOT_FOUND"); - return; - } - delta = strtoul(tokens[2].value, NULL, 10); if(errno == ERANGE) { @@ -1215,14 +1266,38 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt return; } + it = item_get(key, nkey); + if (!it) { + out_string(c, "NOT_FOUND"); + return; + } + + out_string(c, add_delta(it, incr, delta, temp)); + item_remove(it); /* release our reference */ +} + +/* + * adds a delta value to a numeric item. + * + * it item to adjust + * incr true to increment value, false to decrement + * delta amount to adjust value by + * buf buffer for response string + * + * returns a response string to send back to the client. + */ +char *do_add_delta(item *it, int incr, unsigned int delta, char *buf) { + char *ptr; + unsigned int value; + int res; + ptr = ITEM_data(it); while ((*ptr != '\0') && (*ptr < '0' && *ptr > '9')) ptr++; // BUG: can't be true value = strtol(ptr, NULL, 10); if(errno == ERANGE) { - out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value"); - return; + return "CLIENT_ERROR cannot increment or decrement non-numeric value"; } if (incr != 0) @@ -1231,24 +1306,24 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt if (delta >= value) value = 0; else value -= delta; } - snprintf(temp, 32, "%u", value); - res = strlen(temp); + snprintf(buf, 32, "%u", value); + res = strlen(buf); if (res + 2 > it->nbytes) { /* need to realloc */ item *new_it; - new_it = item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 ); + new_it = do_item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 ); if (new_it == 0) { - out_string(c, "SERVER_ERROR out of memory"); - return; + return "SERVER_ERROR out of memory"; } - memcpy(ITEM_data(new_it), temp, res); + memcpy(ITEM_data(new_it), buf, res); memcpy(ITEM_data(new_it) + res, "\r\n", 3); - item_replace(it, new_it); + do_item_replace(it, new_it); + do_item_remove(new_it); /* release our reference */ } else { /* replace in-place */ - memcpy(ITEM_data(it), temp, res); + memcpy(ITEM_data(it), buf, res); memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2); } - out_string(c, temp); - return; + + return buf; } static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) { @@ -1289,17 +1364,32 @@ static void process_delete_command(conn *c, token_t *tokens, const size_t ntoken } } - it = get_item(key, nkey); - if (!it) { - out_string(c, "NOT_FOUND"); - return; + if (settings.detail_enabled) { + stats_prefix_record_delete(key); } - if (exptime == 0) { - item_unlink(it); - out_string(c, "DELETED"); - return; + it = item_get(key, nkey); + if (it) { + if (exptime == 0) { + item_unlink(it); + item_remove(it); /* release our reference */ + out_string(c, "DELETED"); + } else { + /* our reference will be transfered to the delete queue */ + out_string(c, defer_delete(it, exptime)); + } + } else { + out_string(c, "NOT_FOUND"); } +} + +/* + * Adds an item to the deferred-delete list so it can be reaped later. + * + * Returns the result to send to the client. + */ +char *do_defer_delete(item *it, time_t exptime) +{ if (delcurr >= deltotal) { item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2); if (new_delete) { @@ -1310,18 +1400,17 @@ static void process_delete_command(conn *c, token_t *tokens, const size_t ntoken * can't delete it immediately, user wants a delay, * but we ran out of memory for the delete queue */ - out_string(c, "SERVER_ERROR out of memory"); - return; + item_remove(it); /* release reference */ + return "SERVER_ERROR out of memory"; } } - it->refcount++; /* use its expiration time as its deletion time now */ it->exptime = realtime(exptime); it->it_flags |= ITEM_DELETED; todelete[delcurr++] = it; - out_string(c, "DELETED"); - return; + + return "DELETED"; } static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) { @@ -1565,7 +1654,9 @@ static int try_read_udp(conn *c) { 0, &c->request_addr, &c->request_addr_size); if (res > 8) { unsigned char *buf = (unsigned char *)c->rbuf; + STATS_LOCK(); stats.bytes_read += res; + STATS_UNLOCK(); /* Beginning of UDP packet is the request ID; save it. */ c->request_id = buf[0] * 256 + buf[1]; @@ -1632,7 +1723,9 @@ static int try_read_network(conn *c) { res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes); if (res > 0) { + STATS_LOCK(); stats.bytes_read += res; + STATS_UNLOCK(); gotdata = 1; c->rbytes += res; continue; @@ -1653,10 +1746,12 @@ static int try_read_network(conn *c) { static bool update_event(conn *c, const int new_flags) { assert(c != NULL); + struct event_base *base = c->event.ev_base; if (c->ev_flags == new_flags) return true; if (event_del(&c->event) == -1) return false; event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c); + event_base_set(base, &c->event); c->ev_flags = new_flags; if (event_add(&c->event, 0) == -1) return false; return true; @@ -1666,6 +1761,8 @@ static bool update_event(conn *c, const int new_flags) { * Sets whether we are listening for new connections or not. */ void accept_new_conns(const bool do_accept) { + if (! is_listen_thread()) + return; if (do_accept) { update_event(listen_conn, EV_READ | EV_PERSIST); if (listen(listen_conn->sfd, 1024) != 0) { @@ -1704,7 +1801,9 @@ static int transmit(conn *c) { res = sendmsg(c->sfd, m, 0); if (res > 0) { + STATS_LOCK(); stats.bytes_written += res; + STATS_UNLOCK(); /* We've written some of the data. Remove the completed iovec entries from the list of pending writes. */ @@ -1762,14 +1861,16 @@ static void drive_machine(conn *c) { addrlen = sizeof(addr); if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { + /* these are transient, so don't log anything */ stop = true; - break; } else if (errno == EMFILE) { if (settings.verbose > 0) fprintf(stderr, "Too many open connections\n"); accept_new_conns(false); + stop = true; } else { perror("accept()"); + stop = true; } break; } @@ -1779,14 +1880,8 @@ static void drive_machine(conn *c) { close(sfd); break; } - if (conn_new(sfd, conn_read, EV_READ | EV_PERSIST, - DATA_BUFFER_SIZE, false) == NULL) { - if (settings.verbose > 0) - fprintf(stderr, "couldn't create new connection\n"); - close(sfd); - break; - } - + dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST, + DATA_BUFFER_SIZE, false); break; case conn_read: @@ -1826,7 +1921,9 @@ static void drive_machine(conn *c) { /* now try reading from the socket */ res = read(c->sfd, c->ritem, c->rlbytes); if (res > 0) { + STATS_LOCK(); stats.bytes_read += res; + STATS_UNLOCK(); c->ritem += res; c->rlbytes -= res; break; @@ -1870,7 +1967,9 @@ static void drive_machine(conn *c) { /* now try reading from the socket */ res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize); if (res > 0) { + STATS_LOCK(); stats.bytes_read += res; + STATS_UNLOCK(); c->sbytes -= res; break; } @@ -1955,7 +2054,6 @@ static void drive_machine(conn *c) { stop = true; break; } - } return; @@ -1977,7 +2075,6 @@ void event_handler(const int fd, const short which, void *arg) { return; } - /* do as much I/O as possible until we block */ drive_machine(c); /* wait for next event */ @@ -2188,6 +2285,7 @@ static void clock_handler(const int fd, const short which, void *arg) { } evtimer_set(&clockevent, clock_handler, 0); + event_base_set(main_base, &clockevent); evtimer_add(&clockevent, &t); set_current_time(); @@ -2208,22 +2306,28 @@ static void delete_handler(const int fd, const short which, void *arg) { } evtimer_set(&deleteevent, delete_handler, 0); + event_base_set(main_base, &deleteevent); evtimer_add(&deleteevent, &t); - { - int i=0, j=0; - for (i=0; i<delcurr; i++) { - item *it = todelete[i]; - if (item_delete_lock_over(it)) { - assert(it->refcount > 0); - it->it_flags &= ~ITEM_DELETED; - item_unlink(it); - item_remove(it); - } else { - todelete[j++] = it; - } + run_deferred_deletes(); +} + +/* Call run_deferred_deletes instead of this. */ +void do_run_deferred_deletes(void) +{ + int i, j = 0; + + for (i = 0; i < delcurr; i++) { + item *it = todelete[i]; + if (item_delete_lock_over(it)) { + assert(it->refcount > 0); + it->it_flags &= ~ITEM_DELETED; + do_item_unlink(it); + do_item_remove(it); + } else { + todelete[j++] = it; } - delcurr = j; } + delcurr = j; } static void usage(void) { @@ -2247,6 +2351,9 @@ static void usage(void) { "-P <file> save PID in <file>, only used with -d option\n" "-f <factor> chunk size growth factor, default 1.25\n" "-n <bytes> minimum space allocated for key+value+flags, default 48\n"); +#ifdef USE_THREADS + printf("-t <num> number of threads to use, default 4\n"); +#endif return; } @@ -2356,7 +2463,6 @@ static void sig_handler(const int sig) { int main (int argc, char **argv) { int c; - conn *u_conn; struct in_addr addr; bool lock_memory = false; bool daemonize = false; @@ -2377,7 +2483,7 @@ int main (int argc, char **argv) { setbuf(stderr, NULL); /* process arguments */ - while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:n:")) != -1) { + while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:n:t:D:")) != -1) { switch (c) { case 'U': settings.udpport = atoi(optarg); @@ -2446,6 +2552,21 @@ int main (int argc, char **argv) { return 1; } break; + case 't': + settings.num_threads = atoi(optarg); + if (settings.num_threads == 0) { + fprintf(stderr, "Number of threads must be greater than 0\n"); + return 1; + } + break; + case 'D': + if (! optarg || ! optarg[0]) { + fprintf(stderr, "No delimiter specified\n"); + return 1; + } + settings.prefix_delimiter = optarg[0]; + settings.detail_enabled = 1; + break; default: fprintf(stderr, "Illegal argument \"%c\"\n", c); return 1; @@ -2559,10 +2680,11 @@ int main (int argc, char **argv) { } } + /* initialize main thread libevent instance */ + main_base = event_init(); /* initialize other stuff */ item_init(); - event_init(); stats_init(); assoc_init(); conn_init(); @@ -2599,16 +2721,16 @@ int main (int argc, char **argv) { exit(EXIT_FAILURE); } /* create the initial listening connection */ - if (!(listen_conn = conn_new(l_socket, conn_listening, EV_READ | EV_PERSIST, 1, false))) { + if (!(listen_conn = conn_new(l_socket, conn_listening, + EV_READ | EV_PERSIST, 1, false, main_base))) { fprintf(stderr, "failed to create listening connection"); exit(EXIT_FAILURE); } - /* create the initial listening udp connection */ - if (u_socket > -1 && - !(u_conn = conn_new(u_socket, conn_read, EV_READ | EV_PERSIST, UDP_READ_BUFFER_SIZE, true))) { - fprintf(stderr, "failed to create udp connection"); - exit(EXIT_FAILURE); - } + /* save the PID in if we're a daemon */ + if (daemonize) + save_pid(getpid(), pid_file); + /* start up worker threads if MT mode */ + thread_init(settings.num_threads, main_base); /* initialise clock event */ clock_handler(0, 0, 0); /* initialise deletion array and timer event */ @@ -2616,11 +2738,16 @@ int main (int argc, char **argv) { delcurr = 0; todelete = malloc(sizeof(item *) * deltotal); delete_handler(0, 0, 0); /* sets up the event */ - /* save the PID in if we're a daemon */ - if (daemonize) - save_pid(getpid(), pid_file); - /* enter the loop */ - event_loop(0); + /* create the initial listening udp connection, monitored on all threads */ + if (u_socket > -1) { + for (c = 0; c < settings.num_threads; c++) { + /* this is guaranteed to hit all threads because we round-robin */ + dispatch_conn_new(u_socket, conn_read, EV_READ | EV_PERSIST, + UDP_READ_BUFFER_SIZE, 1); + } + } + /* enter the event loop */ + event_base_loop(main_base, 0); /* remove the PID file if we're a daemon */ if (daemonize) remove_pidfile(pid_file); diff --git a/memcached.h b/memcached.h index 2990f66..852e52e 100644 --- a/memcached.h +++ b/memcached.h @@ -1,5 +1,11 @@ /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* $Id$ */ +#include "config.h" +#include <sys/types.h> +#include <sys/time.h> +#include <netinet/in.h> +#include <event.h> + #define DATA_BUFFER_SIZE 2048 #define UDP_READ_BUFFER_SIZE 65536 #define UDP_MAX_PAYLOAD_SIZE 1400 @@ -69,6 +75,9 @@ struct settings { char *socketpath; /* path to unix socket if using local socket */ double factor; /* chunk size growth factor */ int chunk_size; + int num_threads; /* number of libevent threads to run */ + char prefix_delimiter; /* character that marks a key prefix (for stats) */ + int detail_enabled; /* nonzero if we're collecting detailed stats */ }; extern struct stats stats; @@ -199,6 +208,120 @@ extern volatile rel_time_t current_time; * Functions */ +conn *do_conn_from_freelist(); +int do_conn_add_to_freelist(conn *c); +char *do_defer_delete(item *item, time_t exptime); +void do_run_deferred_deletes(void); +char *do_add_delta(item *item, int incr, unsigned int delta, char *buf); +int do_store_item(item *item, int comm); +conn *conn_new(const int sfd, const int init_state, const int event_flags, const int read_buffer_size, const bool is_udp, struct event_base *base); + + +#include "stats.h" #include "slabs.h" #include "assoc.h" #include "items.h" + + +/* + * In multithreaded mode, we wrap certain functions with lock management and + * replace the logic of some other functions. All wrapped functions have + * "mt_" and "do_" variants. In multithreaded mode, the plain version of a + * function is #define-d to the "mt_" variant, which often just grabs a + * lock and calls the "do_" function. In singlethreaded mode, the "do_" + * function is called directly. + * + * Functions such as the libevent-related calls that need to do cross-thread + * communication in multithreaded mode (rather than actually doing the work + * in the current thread) are called via "dispatch_" frontends, which are + * also #define-d to directly call the underlying code in singlethreaded mode. + */ +#ifdef USE_THREADS + +void thread_init(int nthreads, struct event_base *main_base); +int dispatch_event_add(int thread, conn *c); +void dispatch_conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp); + +/* Lock wrappers for cache functions that are called from main loop. */ +char *mt_add_delta(item *item, int incr, unsigned int delta, char *buf); +conn *mt_conn_from_freelist(void); +int mt_conn_add_to_freelist(conn *c); +char *mt_defer_delete(item *it, time_t exptime); +int mt_is_listen_thread(void); +item *mt_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes); +void mt_item_flush_expired(void); +item *mt_item_get_notedeleted(char *key, size_t nkey, int *delete_locked); +item *mt_item_get_nocheck(char *key, size_t nkey); +int mt_item_link(item *it); +void mt_item_remove(item *it); +int mt_item_replace(item *it, item *new_it); +void mt_item_unlink(item *it); +void mt_item_update(item *it); +void mt_run_deferred_deletes(void); +void *mt_slabs_alloc(size_t size); +void mt_slabs_free(void *ptr, size_t size); +int mt_slabs_reassign(unsigned char srcid, unsigned char dstid); +char *mt_slabs_stats(int *buflen); +void mt_stats_lock(void); +void mt_stats_unlock(void); +int mt_store_item(item *item, int comm); + + +# define add_delta(x,y,z,a) mt_add_delta(x,y,z,a) +# define assoc_move_next_bucket() mt_assoc_move_next_bucket() +# define conn_from_freelist() mt_conn_from_freelist() +# define conn_add_to_freelist(x) mt_conn_add_to_freelist(x) +# define defer_delete(x,y) mt_defer_delete(x,y) +# define is_listen_thread() mt_is_listen_thread() +# define item_alloc(x,y,z,a,b) mt_item_alloc(x,y,z,a,b) +# define item_flush_expired() mt_item_flush_expired() +# define item_get_nocheck(x,y) mt_item_get_nocheck(x,y) +# define item_get_notedeleted(x,y,z) mt_item_get_notedeleted(x,y,z) +# define item_link(x) mt_item_link(x) +# define item_remove(x) mt_item_remove(x) +# define item_replace(x,y) mt_item_replace(x,y) +# define item_update(x) mt_item_update(x) +# define item_unlink(x) mt_item_unlink(x) +# define run_deferred_deletes() mt_run_deferred_deletes() +# define slabs_alloc(x) mt_slabs_alloc(x) +# define slabs_free(x,y) mt_slabs_free(x,y) +# define slabs_reassign(x,y) mt_slabs_reassign(x,y) +# define slabs_stats(x) mt_slabs_stats(x) +# define store_item(x,y) mt_store_item(x,y) + +# define STATS_LOCK() mt_stats_lock() +# define STATS_UNLOCK() mt_stats_unlock() + +#else /* !USE_THREADS */ + +# define add_delta(x,y,z,a) do_add_delta(x,y,z,a) +# define assoc_move_next_bucket() do_assoc_move_next_bucket() +# define conn_from_freelist() do_conn_from_freelist() +# define conn_add_to_freelist(x) do_conn_add_to_freelist(x) +# define defer_delete(x,y) do_defer_delete(x,y) +# define dispatch_conn_new(x,y,z,a,b) conn_new(x,y,z,a,b,main_base) +# define dispatch_event_add(t,c) event_add(&(c)->event, 0) +# define is_listen_thread() 1 +# define item_alloc(x,y,z,a,b) do_item_alloc(x,y,z,a,b) +# define item_flush_expired() do_item_flush_expired() +# define item_get_nocheck(x,y) do_item_get_nocheck(x,y) +# define item_get_notedeleted(x,y,z) do_item_get_notedeleted(x,y,z) +# define item_link(x) do_item_link(x) +# define item_remove(x) do_item_remove(x) +# define item_replace(x,y) do_item_replace(x,y) +# define item_unlink(x) do_item_unlink(x) +# define item_update(x) do_item_update(x) +# define run_deferred_deletes() do_run_deferred_deletes() +# define slabs_alloc(x) do_slabs_alloc(x) +# define slabs_free(x,y) do_slabs_free(x,y) +# define slabs_reassign(x,y) do_slabs_reassign(x,y) +# define slabs_stats(x) do_slabs_stats(x) +# define store_item(x,y) do_store_item(x,y) +# define thread_init(x,y) 0 + +# define STATS_LOCK() /**/ +# define STATS_UNLOCK() /**/ + +#endif /* !USE_THREADS */ + + @@ -9,10 +9,8 @@ * * $Id$ */ -#include "config.h" -#include <sys/types.h> +#include "memcached.h" #include <sys/stat.h> -#include <sys/time.h> #include <sys/socket.h> #include <sys/signal.h> #include <sys/resource.h> @@ -21,18 +19,15 @@ #include <stdio.h> #include <string.h> #include <unistd.h> -#include <netinet/in.h> #include <errno.h> -#include <event.h> #include <assert.h> #include <stdbool.h> -#include "memcached.h" - #define POWER_SMALLEST 1 #define POWER_LARGEST 200 #define POWER_BLOCK 1048576 #define CHUNK_ALIGN_BYTES (sizeof(void *)) +#define DONT_PREALLOC_SLABS /* powers-of-N allocation structures */ @@ -63,7 +58,7 @@ static int power_largest; /* * Forward Declarations */ -static int slabs_newslab(const unsigned int id); +static int do_slabs_newslab(const unsigned int id); #ifndef DONT_PREALLOC_SLABS /* Preallocate as many slab pages as possible (called from slabs_init) @@ -161,7 +156,7 @@ static void slabs_preallocate (const unsigned int maxslabs) { for (i = POWER_SMALLEST; i <= POWER_LARGEST; i++) { if (++prealloc > maxslabs) return; - slabs_newslab(i); + do_slabs_newslab(i); } } @@ -179,7 +174,7 @@ static int grow_slab_list (const unsigned int id) { return 1; } -static int slabs_newslab(const unsigned int id) { +static int do_slabs_newslab(const unsigned int id) { slabclass_t *p = &slabclass[id]; #ifdef ALLOW_SLABS_REASSIGN int len = POWER_BLOCK; @@ -206,7 +201,7 @@ static int slabs_newslab(const unsigned int id) { } /*@null@*/ -void *slabs_alloc(const size_t size) { +void *do_slabs_alloc(const size_t size) { slabclass_t *p; unsigned int id = slabs_clsid(size); @@ -225,7 +220,7 @@ void *slabs_alloc(const size_t size) { /* fail unless we have space at the end of a recently allocated page, we have something on our freelist, or we could allocate a new page */ - if (! (p->end_page_ptr != 0 || p->sl_curr != 0 || slabs_newslab(id) != 0)) + if (! (p->end_page_ptr != 0 || p->sl_curr != 0 || do_slabs_newslab(id) != 0)) return 0; /* return off our freelist, if we have one */ @@ -246,7 +241,7 @@ void *slabs_alloc(const size_t size) { return NULL; /* shouldn't ever get here */ } -void slabs_free(void *ptr, const size_t size) { +void do_slabs_free(void *ptr, const size_t size) { unsigned char id = slabs_clsid(size); slabclass_t *p; @@ -276,7 +271,7 @@ void slabs_free(void *ptr, const size_t size) { } /*@null@*/ -char* slabs_stats(int *buflen) { +char* do_slabs_stats(int *buflen) { int i, total; char *buf = (char *)malloc(power_largest * 200 + 100); char *bufcurr = buf; @@ -318,7 +313,7 @@ char* slabs_stats(int *buflen) { 1 = success 0 = fail -1 = tried. busy. send again shortly. */ -int slabs_reassign(unsigned char srcid, unsigned char dstid) { +int do_slabs_reassign(unsigned char srcid, unsigned char dstid) { void *slab, *slab_end; slabclass_t *p, *dp; void *iter; @@ -14,17 +14,17 @@ void slabs_init(const size_t limit, const double factor); unsigned int slabs_clsid(const size_t size); /* Allocate object of given length. 0 on error */ /*@null@*/ -void *slabs_alloc(const size_t size); +void *do_slabs_alloc(const size_t size); /* Free previously allocated object */ -void slabs_free(void *ptr, size_t size); +void do_slabs_free(void *ptr, size_t size); /* Fill buffer with stats */ /*@null@*/ -char* slabs_stats(int *buflen); +char* do_slabs_stats(int *buflen); /* Request some slab be moved between classes 1 = success 0 = fail -1 = tried. busy. send again shortly. */ -int slabs_reassign(unsigned char srcid, unsigned char dstid); +int do_slabs_reassign(unsigned char srcid, unsigned char dstid); @@ -37,7 +37,7 @@ my $sock = $server->sock; my $stats = mem_stats($sock); # Test number of keys -is(scalar(keys(%$stats)), 21, "21 stats values"); +is(scalar(keys(%$stats)), 22, "22 stats values"); # Test initial state foreach my $key (qw(curr_items total_items bytes cmd_get cmd_set get_hits evictions get_misses bytes_written)) { |