summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteven Grimm <sgrimm@facebook.com>2006-11-26 21:33:47 +0000
committerSteven Grimm <sgrimm@facebook.com>2006-11-26 21:33:47 +0000
commit217dcce072c268a9f404c3dcda89aa55c1a7423f (patch)
tree6a4063bfdf2cc3c073a3f0a31601c057ceb4f1cb
parent117ca7fc923dfac10d033dec47175eef09817926 (diff)
downloadmemcached-217dcce072c268a9f404c3dcda89aa55c1a7423f.tar.gz
Incorporate changes from "performance" branch (revisions 414-419).
git-svn-id: http://code.sixapart.com/svn/memcached/trunk/server@450 b0b603af-a30f-0410-a34e-baf09ae79d0b
-rw-r--r--ChangeLog18
-rw-r--r--assoc.c161
-rw-r--r--items.c57
-rw-r--r--memcached.c797
-rw-r--r--memcached.h21
5 files changed, 699 insertions, 355 deletions
diff --git a/ChangeLog b/ChangeLog
index ca25493..c21e69b 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,21 @@
+2006-11-26
+ * Steven Grimm <sgrimm@facebook.com>: Performance improvements:
+
+ Dynamic sizing of hashtable to reduce collisions on very large
+ caches and conserve memory on small caches.
+
+ Only reposition items in the LRU queue once a minute, to reduce
+ overhead of accessing extremely frequently-used items.
+
+ Stop listening for new connections until an existing one closes
+ if we run out of available file descriptors.
+
+ Command parser refactoring: Add a single-pass tokenizer to cut
+ down on string scanning. Split the command processing into
+ separate functions for easier profiling and better readability.
+ Pass key lengths along with the keys in all API functions that
+ need keys, to avoid needing to call strlen() repeatedly.
+
2006-11-25
* Steve Peters <steve@fisharerojo.org>: OpenBSD has a malloc.h,
but warns to use stdlib.h instead
diff --git a/assoc.c b/assoc.c
index 06cc40e..4f7a8ac 100644
--- a/assoc.c
+++ b/assoc.c
@@ -31,15 +31,6 @@
#include "memcached.h"
-typedef unsigned long int ub4; /* unsigned 4-byte quantities */
-typedef unsigned char ub1; /* unsigned 1-byte quantities */
-
-/* hard-code one million buckets, for now (2**20 == 4MB hash) */
-#define HASHPOWER 20
-
-#define hashsize(n) ((ub4)1<<(n))
-#define hashmask(n) (hashsize(n)-1)
-
/*
* Since the hash function does bit manipulation, it needs to know
* whether it's big or little-endian. ENDIAN_LITTLE and ENDIAN_BIG
@@ -459,25 +450,64 @@ uint32_t hash( const void *key, size_t length, uint32_t initval)
#error Must define HASH_BIG_ENDIAN or HASH_LITTLE_ENDIAN
#endif // hash_XXX_ENDIAN == 1
-static item** hashtable = 0;
+typedef unsigned long int ub4; /* unsigned 4-byte quantities */
+typedef unsigned char ub1; /* unsigned 1-byte quantities */
+
+/* how many powers of 2's worth of buckets we use */
+int hashpower = 16;
+
+#define hashsize(n) ((ub4)1<<(n))
+#define hashmask(n) (hashsize(n)-1)
+
+/* Main hash table. This is where we look except during expansion. */
+static item** primary_hashtable = 0;
+
+/*
+ * Previous hash table. During expansion, we look here for keys that haven't
+ * been moved over to the primary yet.
+ */
+static item** old_hashtable = 0;
+
+/* Number of items in the hash table. */
+static int hash_items = 0;
+
+/* Flag: Are we in the middle of expanding now? */
+static int expanding = 0;
+
+/*
+ * During expansion we migrate values with bucket granularity; this is how
+ * far we've gotten so far. Ranges from 0 .. hashsize(hashpower - 1) - 1.
+ */
+static int expand_bucket = 0;
void assoc_init(void) {
- unsigned int hash_size = hashsize(HASHPOWER) * sizeof(void*);
- hashtable = malloc(hash_size);
- if (! hashtable) {
+ unsigned int hash_size = hashsize(hashpower) * sizeof(void*);
+ primary_hashtable = malloc(hash_size);
+ if (! primary_hashtable) {
fprintf(stderr, "Failed to init hashtable.\n");
exit(1);
}
- memset(hashtable, 0, hash_size);
+ memset(primary_hashtable, 0, hash_size);
}
-item *assoc_find(char *key) {
- ub4 hv = hash(key, strlen(key), 0) & hashmask(HASHPOWER);
- item *it = hashtable[hv];
+item *assoc_find(const char *key, size_t nkey) {
+ uint32_t hv = hash(key, nkey, 0);
+ item *it;
+ int oldbucket;
+
+ if (expanding &&
+ (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
+ {
+ it = old_hashtable[oldbucket];
+ } else {
+ it = primary_hashtable[hv & hashmask(hashpower)];
+ }
while (it) {
- if (strcmp(key, ITEM_key(it)) == 0)
+ if ((nkey == it->nkey) &&
+ (memcmp(key, ITEM_key(it), nkey) == 0)) {
return it;
+ }
it = it->h_next;
}
return 0;
@@ -486,35 +516,104 @@ item *assoc_find(char *key) {
/* returns the address of the item pointer before the key. if *item == 0,
the item wasn't found */
-static item** _hashitem_before (char *key) {
- ub4 hv = hash(key, strlen(key), 0) & hashmask(HASHPOWER);
- item **pos = &hashtable[hv];
+static item** _hashitem_before (const char *key, size_t nkey) {
+ uint32_t hv = hash(key, nkey, 0);
+ item **pos;
+ int oldbucket;
- while (*pos && strcmp(key, ITEM_key(*pos))) {
+ if (expanding &&
+ (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
+ {
+ pos = &old_hashtable[oldbucket];
+ } else {
+ pos = &primary_hashtable[hv & hashmask(hashpower)];
+ }
+
+ while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {
pos = &(*pos)->h_next;
}
return pos;
}
+/* grows the hashtable to the next power of 2. */
+static void assoc_expand(void) {
+ old_hashtable = primary_hashtable;
+
+ primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
+ if (primary_hashtable) {
+ if (settings.verbose > 1)
+ fprintf(stderr, "Hash table expansion starting\n");
+ hashpower++;
+ expanding = 1;
+ expand_bucket = 0;
+ assoc_move_next_bucket();
+ } else {
+ primary_hashtable = old_hashtable;
+ /* Bad news, but we can keep running. */
+ }
+}
+
+/* migrates the next bucket to the primary hashtable if we're expanding. */
+void assoc_move_next_bucket(void) {
+ item *it, *next;
+ int bucket;
+
+ if (expanding) {
+ for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
+ next = it->h_next;
+
+ bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower);
+ it->h_next = primary_hashtable[bucket];
+ primary_hashtable[bucket] = it;
+ }
+
+ expand_bucket++;
+ if (expand_bucket == hashsize(hashpower - 1)) {
+ expanding = 0;
+ free(old_hashtable);
+ if (settings.verbose > 1)
+ fprintf(stderr, "Hash table expansion done\n");
+ }
+ }
+}
+
/* Note: this isn't an assoc_update. The key must not already exist to call this */
-int assoc_insert(char *key, item *it) {
- ub4 hv;
- assert(assoc_find(key) == 0); /* shouldn't have duplicately named things defined */
- hv = hash(key, strlen(key), 0) & hashmask(HASHPOWER);
- it->h_next = hashtable[hv];
- hashtable[hv] = it;
+int assoc_insert(item *it) {
+ uint32_t hv;
+ int oldbucket;
+
+ assert(assoc_find(ITEM_key(it), it->nkey) == 0); /* shouldn't have duplicately named things defined */
+
+ hv = hash(ITEM_key(it), it->nkey, 0);
+ if (expanding &&
+ (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
+ {
+ it->h_next = old_hashtable[oldbucket];
+ old_hashtable[oldbucket] = it;
+ } else {
+ it->h_next = primary_hashtable[hv & hashmask(hashpower)];
+ primary_hashtable[hv & hashmask(hashpower)] = it;
+ }
+
+ hash_items++;
+ if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
+ assoc_expand();
+ }
+
return 1;
}
-void assoc_delete(char *key) {
- item **before = _hashitem_before(key);
+void assoc_delete(const char *key, size_t nkey) {
+ item **before = _hashitem_before(key, nkey);
+
if (*before) {
item *nxt = (*before)->h_next;
(*before)->h_next = 0; /* probably pointless, but whatever. */
*before = nxt;
+ hash_items--;
return;
}
- /* Note: we never actually get here. the callers don't delete things
+ /* Note: we never actually get here. the callers don't delete things
they can't find. */
assert(*before != 0);
}
diff --git a/items.c b/items.c
index 09b8b5e..18b750e 100644
--- a/items.c
+++ b/items.c
@@ -19,6 +19,12 @@
#include "memcached.h"
+/*
+ * We only reposition items in the LRU queue if they haven't been repositioned
+ * in this many seconds. That saves us from churning on frequently-accessed
+ * items.
+ */
+#define ITEM_UPDATE_INTERVAL 60
#define LARGEST_ID 255
static item *heads[LARGEST_ID];
@@ -38,28 +44,29 @@ void item_init(void) {
/*
* Generates the variable-sized part of the header for an object.
*
+ * key - The key
+ * nkey - The length of the key
+ * flags - key flags
+ * nbytes - Number of bytes to hold value and addition CRLF terminator
* suffix - Buffer for the "VALUE" line suffix (flags, size).
* nsuffix - The length of the suffix is stored here.
- * keylen - The length of the key plus any padding required to word-align the
- * "VALUE" suffix (which is done to speed up copying.)
*
* Returns the total size of the header.
*/
-int item_make_header(char *key, int flags, int nbytes,
- char *suffix, int *nsuffix, int *keylen) {
- *keylen = strlen(key) + 1; if(*keylen % 4) *keylen += 4 - (*keylen % 4);
+int item_make_header(char *key, uint8_t nkey, int flags, int nbytes,
+ char *suffix, int *nsuffix) {
*nsuffix = sprintf(suffix, " %u %u\r\n", flags, nbytes - 2);
- return sizeof(item) + *keylen + *nsuffix + nbytes;
+ return sizeof(item) + nkey + *nsuffix + nbytes;
}
-
-item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes) {
- int nsuffix, ntotal, len;
+
+item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
+ int nsuffix, ntotal;
item *it;
unsigned int id;
char suffix[40];
- ntotal = item_make_header(key, flags, nbytes, suffix, &nsuffix, &len);
-
+ ntotal = item_make_header(key, nkey + 1, flags, nbytes, suffix, &nsuffix);
+
id = slabs_clsid(ntotal);
if (id == 0)
return 0;
@@ -75,8 +82,8 @@ item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes) {
if (!settings.evict_to_free) return 0;
- /*
- * try to get one off the right LRU
+ /*
+ * try to get one off the right LRU
* don't necessariuly unlink the tail because it may be locked: refcount>0
* search up from tail an item with refcount==0 and unlink it; give up after 50
* tries
@@ -104,7 +111,7 @@ item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes) {
it->next = it->prev = it->h_next = 0;
it->refcount = 0;
it->it_flags = 0;
- it->nkey = len;
+ it->nkey = nkey;
it->nbytes = nbytes;
strcpy(ITEM_key(it), key);
it->exptime = exptime;
@@ -130,12 +137,12 @@ void item_free(item *it) {
* Returns true if an item will fit in the cache (its size does not exceed
* the maximum for a cache entry.)
*/
-int item_size_ok(char *key, int flags, int nbytes) {
+int item_size_ok(char *key, size_t nkey, int flags, int nbytes) {
char prefix[40];
- int keylen, nsuffix;
+ int nsuffix;
- return slabs_clsid(item_make_header(key, flags, nbytes,
- prefix, &nsuffix, &keylen)) != 0;
+ return slabs_clsid(item_make_header(key, nkey + 1, flags, nbytes,
+ prefix, &nsuffix)) != 0;
}
void item_link_q(item *it) { /* item is the new head */
@@ -184,7 +191,7 @@ int item_link(item *it) {
assert(it->nbytes < 1048576);
it->it_flags |= ITEM_LINKED;
it->time = current_time;
- assoc_insert(ITEM_key(it), it);
+ assoc_insert(it);
stats.curr_bytes += ITEM_ntotal(it);
stats.curr_items += 1;
@@ -200,7 +207,7 @@ void item_unlink(item *it) {
it->it_flags &= ~ITEM_LINKED;
stats.curr_bytes -= ITEM_ntotal(it);
stats.curr_items -= 1;
- assoc_delete(ITEM_key(it));
+ assoc_delete(ITEM_key(it), it->nkey);
item_unlink_q(it);
}
if (it->refcount == 0) item_free(it);
@@ -216,11 +223,13 @@ void item_remove(item *it) {
}
void item_update(item *it) {
- assert((it->it_flags & ITEM_SLABBED) == 0);
+ if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
+ assert((it->it_flags & ITEM_SLABBED) == 0);
- item_unlink_q(it);
- it->time = current_time;
- item_link_q(it);
+ item_unlink_q(it);
+ it->time = current_time;
+ item_link_q(it);
+ }
}
int item_replace(item *it, item *new_it) {
diff --git a/memcached.c b/memcached.c
index 47c8809..53580af 100644
--- a/memcached.c
+++ b/memcached.c
@@ -72,6 +72,7 @@ struct settings settings;
static item **todelete = 0;
static int delcurr;
static int deltotal;
+static conn *listen_conn;
#define TRANSMIT_COMPLETE 0
#define TRANSMIT_INCOMPLETE 1
@@ -104,6 +105,7 @@ void stats_init(void) {
values are now false in boolean context... */
stats.started = time(0) - 2;
}
+
void stats_reset(void) {
stats.total_items = stats.total_conns = 0;
stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = 0;
@@ -133,8 +135,8 @@ int item_delete_lock_over (item *it) {
}
/* wrapper around assoc_find which does the lazy expiration/deletion logic */
-item *get_item_notedeleted(char *key, int *delete_locked) {
- item *it = assoc_find(key);
+item *get_item_notedeleted(char *key, size_t nkey, int *delete_locked) {
+ item *it = assoc_find(key, nkey);
if (delete_locked) *delete_locked = 0;
if (it && (it->it_flags & ITEM_DELETED)) {
/* it's flagged as delete-locked. let's see if that condition
@@ -157,8 +159,8 @@ item *get_item_notedeleted(char *key, int *delete_locked) {
return it;
}
-item *get_item(char *key) {
- return get_item_notedeleted(key, 0);
+item *get_item(char *key, size_t nkey) {
+ return get_item_notedeleted(key, nkey, 0);
}
/*
@@ -357,6 +359,7 @@ void conn_close(conn *c) {
fprintf(stderr, "<%d connection closed.\n", c->sfd);
close(c->sfd);
+ accept_new_conns(1);
conn_cleanup(c);
/* if the connection has big buffers, just free it */
@@ -390,12 +393,12 @@ int do_realloc(void **orig, int newsize, int bytes_per_item, int *size) {
if (newbuf) {
*orig = newbuf;
*size = newsize;
- return 1;
+ return 1;
}
return 0;
}
- /*
+/*
* Shrinks a connection's buffers if they're too big. This prevents
* periodic large "get" requests from permanently chewing lots of server
* memory.
@@ -408,7 +411,10 @@ void conn_shrink(conn *c) {
return;
if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
- do_realloc((void **)&c->rbuf, DATA_BUFFER_SIZE, 1, &c->rsize);
+ if (c->rcurr != c->rbuf)
+ memmove(c->rbuf, c->rcurr, c->rbytes);
+ do_realloc((void **)&c->rbuf, DATA_BUFFER_SIZE, 1, &c->rsize);
+ c->rcurr = c->rbuf;
}
if (c->isize > ITEM_LIST_HIGHWAT) {
@@ -433,6 +439,7 @@ void conn_set_state(conn *c, int state) {
if (state != c->state) {
if (state == conn_read) {
conn_shrink(c);
+ assoc_move_next_bucket();
}
c->state = state;
}
@@ -475,7 +482,6 @@ int ensure_iov_space(conn *c) {
int add_iov(conn *c, const void *buf, int len) {
struct msghdr *m;
- int i;
int leftover;
int limit_to_mtu;
@@ -490,7 +496,7 @@ int add_iov(conn *c, const void *buf, int len) {
/* We may need to start a new msghdr if this one is full. */
if (m->msg_iovlen == IOV_MAX ||
- limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE) {
+ (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
add_msghdr(c);
m = &c->msglist[c->msgused - 1];
}
@@ -602,7 +608,7 @@ void complete_nread(conn *c) {
goto err;
}
- old_it = get_item_notedeleted(key, &delete_locked);
+ old_it = get_item_notedeleted(key, it->nkey, &delete_locked);
if (old_it && comm == NREAD_ADD) {
item_update(old_it); /* touches item, promotes to head of LRU */
@@ -625,7 +631,7 @@ void complete_nread(conn *c) {
window... in which case we have to find the old hidden item
that's in the namespace/LRU but wasn't returned by
get_item.... because we need to replace it (below) */
- old_it = assoc_find(key);
+ old_it = assoc_find(key, it->nkey);
}
if (old_it)
@@ -643,10 +649,94 @@ err:
return;
}
-void process_stat(conn *c, char *command) {
+typedef struct token_s {
+ char* value;
+ size_t length;
+} token_t;
+
+#define COMMAND_TOKEN 0
+#define SUBCOMMAND_TOKEN 1
+#define KEY_TOKEN 1
+#define KEY_MAX_LENGTH 250
+
+#define MAX_TOKENS 6
+
+/*
+ * Tokenize the command string by replacing whitespace with '\0' and update
+ * the token array tokens with pointer to start of each token and length.
+ * Returns total number of tokens. The last valid token is the terminal
+ * token (value points to the first unprocessed character of the string and
+ * length zero).
+ *
+ * Usage example:
+ *
+ * while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
+ * for(int ix = 0; tokens[ix].length != 0; ix++) {
+ * ...
+ * }
+ * ncommand = tokens[ix].value - command;
+ * command = tokens[ix].value;
+ * }
+ */
+size_t tokenize_command(char* command, token_t* tokens, size_t max_tokens) {
+ char* cp;
+ char* value = NULL;
+ size_t length = 0;
+ size_t ntokens = 0;
+
+ assert(command != NULL && tokens != NULL && max_tokens > 1);
+
+ cp = command;
+ while(*cp != '\0' && ntokens < max_tokens - 1) {
+ if(*cp == ' ') {
+ // If we've accumulated a token, this is the end of it.
+ if(length > 0) {
+ tokens[ntokens].value = value;
+ tokens[ntokens].length = length;
+ ntokens++;
+ length = 0;
+ value = NULL;
+ }
+ *cp = '\0';
+ } else {
+ if(length == 0) {
+ value = cp;
+ }
+ length++;
+ }
+ cp++;
+ }
+
+ if(ntokens < max_tokens - 1 && length > 0) {
+ tokens[ntokens].value = value;
+ tokens[ntokens].length = length;
+ ntokens++;
+ }
+
+ /*
+ * If we scanned the whole string, the terminal value pointer is null,
+ * otherwise it is the first unprocessed character.
+ */
+ tokens[ntokens].value = *cp == '\0' ? NULL : cp;
+ tokens[ntokens].length = 0;
+ ntokens++;
+
+ return ntokens;
+}
+
+void process_stat(conn *c, token_t* tokens, size_t ntokens) {
rel_time_t now = current_time;
+ char* command;
+ char* subcommand;
- if (strcmp(command, "stats") == 0) {
+ if(ntokens < 2) {
+ out_string(c, "CLIENT_ERROR bad command line");
+ return;
+ }
+
+ command = tokens[COMMAND_TOKEN].value;
+
+ if (ntokens == 2 && strcmp(command, "stats") == 0) {
char temp[1024];
pid_t pid = getpid();
char *pos = temp;
@@ -679,7 +769,9 @@ void process_stat(conn *c, char *command) {
return;
}
- if (strcmp(command, "stats reset") == 0) {
+ subcommand = tokens[SUBCOMMAND_TOKEN].value;
+
+ if (strcmp(subcommand, "reset") == 0) {
stats_reset();
out_string(c, "RESET");
return;
@@ -687,7 +779,7 @@ void process_stat(conn *c, char *command) {
#ifdef HAVE_MALLOC_H
#ifdef HAVE_STRUCT_MALLINFO
- if (strcmp(command, "stats malloc") == 0) {
+ if (strcmp(subcommand, "malloc") == 0) {
char temp[512];
struct mallinfo info;
char *pos = temp;
@@ -709,7 +801,7 @@ void process_stat(conn *c, char *command) {
#endif /* HAVE_STRUCT_MALLINFO */
#endif /* HAVE_MALLOC_H */
- if (strcmp(command, "stats maps") == 0) {
+ if (strcmp(subcommand, "maps") == 0) {
char *wbuf;
int wsize = 8192; /* should be enough */
int fd;
@@ -720,7 +812,7 @@ void process_stat(conn *c, char *command) {
out_string(c, "SERVER_ERROR out of memory");
return;
}
-
+
fd = open("/proc/self/maps", O_RDONLY);
if (fd == -1) {
out_string(c, "SERVER_ERROR cannot open the maps file");
@@ -742,22 +834,31 @@ void process_stat(conn *c, char *command) {
strcpy(wbuf + res, "END\r\n");
c->write_and_free=wbuf;
c->wcurr=wbuf;
- c->wbytes = res + 6;
+ c->wbytes = res + 5; // Don't write the terminal '\0'
conn_set_state(c, conn_write);
c->write_and_go = conn_read;
close(fd);
return;
}
- if (strncmp(command, "stats cachedump", 15) == 0) {
+ if (strcmp(subcommand, "cachedump") == 0) {
+
char *buf;
unsigned int bytes, id, limit = 0;
- char *start = command + 15;
- if (sscanf(start, "%u %u\r\n", &id, &limit) < 1) {
+
+ if(ntokens < 5) {
out_string(c, "CLIENT_ERROR bad command line");
return;
}
+ id = strtoul(tokens[2].value, NULL, 10);
+ limit = strtoul(tokens[3].value, NULL, 10);
+
+ if(errno == ERANGE) {
+ out_string(c, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
buf = item_cachedump(id, limit, &bytes);
if (buf == 0) {
out_string(c, "SERVER_ERROR out of memory");
@@ -772,7 +873,7 @@ void process_stat(conn *c, char *command) {
return;
}
- if (strcmp(command, "stats slabs")==0) {
+ if (strcmp(subcommand, "slabs")==0) {
int bytes = 0;
char *buf = slabs_stats(&bytes);
if (!buf) {
@@ -787,14 +888,14 @@ void process_stat(conn *c, char *command) {
return;
}
- if (strcmp(command, "stats items")==0) {
+ if (strcmp(subcommand, "items")==0) {
char buffer[4096];
item_stats(buffer, 4096);
out_string(c, buffer);
return;
}
- if (strcmp(command, "stats sizes")==0) {
+ if (strcmp(subcommand, "sizes")==0) {
int bytes = 0;
char *buf = item_stats_sizes(&bytes);
if (! buf) {
@@ -813,171 +914,39 @@ void process_stat(conn *c, char *command) {
out_string(c, "ERROR");
}
-void process_command(conn *c, char *command) {
-
- int comm = 0;
- int incr = 0;
-
- /*
- * for commands set/add/replace, we build an item and read the data
- * directly into it, then continue in nread_complete().
- */
-
- if (settings.verbose > 1)
- fprintf(stderr, "<%d %s\n", c->sfd, command);
-
- c->msgcurr = 0;
- c->msgused = 0;
- c->iovused = 0;
- if (add_msghdr(c)) {
- out_string(c, "SERVER_ERROR out of memory");
- return;
- }
-
- if ((strncmp(command, "add ", 4) == 0 && (comm = NREAD_ADD)) ||
- (strncmp(command, "set ", 4) == 0 && (comm = NREAD_SET)) ||
- (strncmp(command, "replace ", 8) == 0 && (comm = NREAD_REPLACE))) {
-
- char key[251];
- int flags;
- time_t expire;
- int len, res;
- item *it;
-
- res = sscanf(command, "%*s %250s %u %ld %d\n", key, &flags, &expire, &len);
- if (res!=4 || strlen(key)==0 ) {
- out_string(c, "CLIENT_ERROR bad command line format");
- return;
- }
-
- if (settings.managed) {
- int bucket = c->bucket;
- if (bucket == -1) {
- out_string(c, "CLIENT_ERROR no BG data in managed mode");
- return;
- }
- c->bucket = -1;
- if (buckets[bucket] != c->gen) {
- out_string(c, "ERROR_NOT_OWNER");
- return;
- }
- }
-
- expire = realtime(expire);
- it = item_alloc(key, flags, expire, len+2);
-
- if (it == 0) {
- if (! item_size_ok(key, flags, len + 2))
- out_string(c, "SERVER_ERROR object too large for cache");
- else
- out_string(c, "SERVER_ERROR out of memory");
- /* swallow the data line */
- c->write_and_go = conn_swallow;
- c->sbytes = len+2;
- return;
- }
-
- c->item_comm = comm;
- c->item = it;
- c->ritem = ITEM_data(it);
- c->rlbytes = it->nbytes;
- conn_set_state(c, conn_nread);
- return;
- }
+inline void process_get_command(conn *c, token_t* tokens, size_t ntokens) {
+ char *key;
+ size_t nkey;
+ int i = 0;
+ item *it;
+ token_t* key_token = &tokens[KEY_TOKEN];
- if ((strncmp(command, "incr ", 5) == 0 && (incr = 1)) ||
- (strncmp(command, "decr ", 5) == 0)) {
- char temp[32];
- unsigned int value;
- item *it;
- unsigned int delta;
- char key[251];
- int res;
- char *ptr;
- res = sscanf(command, "%*s %250s %u\n", key, &delta);
- if (res!=2 || strlen(key)==0 ) {
- out_string(c, "CLIENT_ERROR bad command line format");
+ if (settings.managed) {
+ int bucket = c->bucket;
+ if (bucket == -1) {
+ out_string(c, "CLIENT_ERROR no BG data in managed mode");
return;
}
- if (settings.managed) {
- int bucket = c->bucket;
- if (bucket == -1) {
- out_string(c, "CLIENT_ERROR no BG data in managed mode");
- return;
- }
- c->bucket = -1;
- if (buckets[bucket] != c->gen) {
- out_string(c, "ERROR_NOT_OWNER");
- return;
- }
- }
- it = get_item(key);
- if (!it) {
- out_string(c, "NOT_FOUND");
+ c->bucket = -1;
+ if (buckets[bucket] != c->gen) {
+ out_string(c, "ERROR_NOT_OWNER");
return;
}
- ptr = ITEM_data(it);
- while (*ptr && (*ptr<'0' && *ptr>'9')) ptr++; // BUG: can't be true
- value = atoi(ptr);
- if (incr)
- value+=delta;
- else {
- if (delta >= value) value = 0;
- else value-=delta;
- }
- sprintf(temp, "%u", value);
- res = strlen(temp);
- if (res + 2 > it->nbytes) { /* need to realloc */
- item *new_it;
- new_it = item_alloc(ITEM_key(it), atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
- if (new_it == 0) {
- out_string(c, "SERVER_ERROR out of memory");
- return;
- }
- memcpy(ITEM_data(new_it), temp, res);
- memcpy(ITEM_data(new_it) + res, "\r\n", 2);
- item_replace(it, new_it);
- } else { /* replace in-place */
- memcpy(ITEM_data(it), temp, res);
- memset(ITEM_data(it) + res, ' ', it->nbytes-res-2);
- }
- out_string(c, temp);
- return;
}
- if (strncmp(command, "bget ", 5) == 0) {
- c->binary = 1;
- goto get;
- }
- if (strncmp(command, "get ", 4) == 0) {
-
- char *start = command + 4;
- char key[251];
- int next;
- int i;
- item *it;
- rel_time_t now;
- get:
- now = current_time;
- i = 0;
-
- if (settings.managed) {
- int bucket = c->bucket;
- if (bucket == -1) {
- out_string(c, "CLIENT_ERROR no BG data in managed mode");
- return;
- }
- c->bucket = -1;
- if (buckets[bucket] != c->gen) {
- out_string(c, "ERROR_NOT_OWNER");
+ do {
+ while(key_token->length != 0) {
+
+ key = key_token->value;
+ nkey = key_token->length;
+
+ if(nkey > KEY_MAX_LENGTH) {
+ out_string(c, "CLIENT_ERROR bad command line format");
return;
}
- }
-
- while(sscanf(start, " %250s%n", key, &next) >= 1) {
- start+=next;
+
stats.get_cmds++;
- it = get_item(key);
+ it = get_item(key, nkey);
if (it) {
if (i >= c->isize) {
item **new_list = realloc(c->ilist, sizeof(item *)*c->isize*2);
@@ -986,7 +955,7 @@ void process_command(conn *c, char *command) {
c->ilist = new_list;
} else break;
}
-
+
/*
* Construct the response. Each hit adds three elements to the
* outgoing data list:
@@ -994,13 +963,12 @@ void process_command(conn *c, char *command) {
* key
* " " + flags + " " + data length + "\r\n" + data (with \r\n)
*/
- /* TODO: can we avoid the strlen() func call and cache that in wasted byte in item struct? */
if (add_iov(c, "VALUE ", 6) ||
- add_iov(c, ITEM_key(it), strlen(ITEM_key(it))) ||
+ add_iov(c, ITEM_key(it), it->nkey) ||
add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes))
- {
- break;
- }
+ {
+ break;
+ }
if (settings.verbose > 1)
fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
@@ -1009,87 +977,307 @@ void process_command(conn *c, char *command) {
item_update(it);
*(c->ilist + i) = it;
i++;
+
} else stats.get_misses++;
+
+ key_token++;
}
- c->icurr = c->ilist;
- c->ileft = i;
+ /*
+ * If the command string hasn't been fully processed, get the next set
+ * of tokens.
+ */
+ if(key_token->value != NULL) {
+ ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
+ key_token = tokens;
+ }
+
+ } while(key_token->value != NULL);
- if (settings.verbose > 1)
- fprintf(stderr, ">%d END\n", c->sfd);
- add_iov(c, "END\r\n", 5);
+ c->icurr = c->ilist;
+ c->ileft = i;
+
+ if (settings.verbose > 1)
+ fprintf(stderr, ">%d END\n", c->sfd);
+ add_iov(c, "END\r\n", 5);
+
+ if (c->udp && build_udp_headers(c)) {
+ out_string(c, "SERVER_ERROR out of memory");
+ }
+ else {
+ conn_set_state(c, conn_mwrite);
+ c->msgcurr = 0;
+ }
+ return;
+}
+
+void process_update_command(conn *c, token_t* tokens, size_t ntokens, int comm) {
+ char *key;
+ size_t nkey;
+ int flags;
+ time_t exptime;
+ int vlen;
+ item *it;
- if (c->udp && build_udp_headers(c)) {
+ if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
+ out_string(c, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
+ key = tokens[KEY_TOKEN].value;
+ nkey = tokens[KEY_TOKEN].length;
+
+ flags = strtoul(tokens[2].value, NULL, 10);
+ exptime = strtol(tokens[3].value, NULL, 10);
+ vlen = strtol(tokens[4].value, NULL, 10);
+
+ if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
+ out_string(c, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
+ if (settings.managed) {
+ int bucket = c->bucket;
+ if (bucket == -1) {
+ out_string(c, "CLIENT_ERROR no BG data in managed mode");
+ return;
+ }
+ c->bucket = -1;
+ if (buckets[bucket] != c->gen) {
+ out_string(c, "ERROR_NOT_OWNER");
+ return;
+ }
+ }
+
+ it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
+
+ if (it == 0) {
+ if (! item_size_ok(key, nkey, flags, vlen + 2))
+ out_string(c, "SERVER_ERROR object too large for cache");
+ else
out_string(c, "SERVER_ERROR out of memory");
+ /* swallow the data line */
+ c->write_and_go = conn_swallow;
+ c->sbytes = vlen+2;
+ return;
+ }
+
+ c->item_comm = comm;
+ c->item = it;
+ c->ritem = ITEM_data(it);
+ c->rlbytes = it->nbytes;
+ conn_set_state(c, conn_nread);
+ return;
+}
+
+void process_arithmetic_command(conn *c, token_t* tokens, size_t ntokens, int incr) {
+ char temp[32];
+ unsigned int value;
+ item *it;
+ unsigned int delta;
+ char *key;
+ size_t nkey;
+ int res;
+ char *ptr;
+
+ if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
+ out_string(c, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
+ key = tokens[KEY_TOKEN].value;
+ nkey = tokens[KEY_TOKEN].length;
+
+ if (settings.managed) {
+ int bucket = c->bucket;
+ if (bucket == -1) {
+ out_string(c, "CLIENT_ERROR no BG data in managed mode");
+ return;
}
- else {
- conn_set_state(c, conn_mwrite);
- c->msgcurr = 0;
+ c->bucket = -1;
+ if (buckets[bucket] != c->gen) {
+ out_string(c, "ERROR_NOT_OWNER");
+ return;
}
+ }
+
+ it = get_item(key, nkey);
+ if (!it) {
+ out_string(c, "NOT_FOUND");
return;
}
- if (strncmp(command, "delete ", 7) == 0) {
- char key[251];
- item *it;
- int res;
- time_t exptime = 0;
+ delta = strtoul(tokens[2].value, NULL, 10);
+
+ if(errno == ERANGE) {
+ out_string(c, "CLIENT_ERROR bad command line format");
+ return;
+ }
- if (settings.managed) {
- int bucket = c->bucket;
- if (bucket == -1) {
- out_string(c, "CLIENT_ERROR no BG data in managed mode");
- return;
- }
- c->bucket = -1;
- if (buckets[bucket] != c->gen) {
- out_string(c, "ERROR_NOT_OWNER");
- return;
- }
+ ptr = ITEM_data(it);
+ while (*ptr && (*ptr<'0' && *ptr>'9')) ptr++; // BUG: can't be true
+
+ value = strtol(ptr, NULL, 10);
+
+ if(errno == ERANGE) {
+ out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
+ return;
+ }
+
+ if (incr)
+ value+=delta;
+ else {
+ if (delta >= value) value = 0;
+ else value-=delta;
+ }
+ sprintf(temp, "%u", value);
+ res = strlen(temp);
+ if (res + 2 > it->nbytes) { /* need to realloc */
+ item *new_it;
+ new_it = item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
+ if (new_it == 0) {
+ out_string(c, "SERVER_ERROR out of memory");
+ return;
}
- res = sscanf(command, "%*s %250s %ld", key, &exptime);
- it = get_item(key);
- if (!it) {
- out_string(c, "NOT_FOUND");
+ memcpy(ITEM_data(new_it), temp, res);
+ memcpy(ITEM_data(new_it) + res, "\r\n", 2);
+ item_replace(it, new_it);
+ } else { /* replace in-place */
+ memcpy(ITEM_data(it), temp, res);
+ memset(ITEM_data(it) + res, ' ', it->nbytes-res-2);
+ }
+ out_string(c, temp);
+ return;
+}
+
+void process_delete_command(conn *c, token_t* tokens, size_t ntokens) {
+ char *key;
+ size_t nkey;
+ item *it;
+ time_t exptime = 0;
+
+ if (settings.managed) {
+ int bucket = c->bucket;
+ if (bucket == -1) {
+ out_string(c, "CLIENT_ERROR no BG data in managed mode");
return;
}
- if (exptime == 0) {
- item_unlink(it);
- out_string(c, "DELETED");
+ c->bucket = -1;
+ if (buckets[bucket] != c->gen) {
+ out_string(c, "ERROR_NOT_OWNER");
return;
}
- if (delcurr >= deltotal) {
- item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
- if (new_delete) {
- todelete = new_delete;
- deltotal *= 2;
- } else {
- /*
- * can't delete it immediately, user wants a delay,
- * but we ran out of memory for the delete queue
- */
- out_string(c, "SERVER_ERROR out of memory");
- return;
- }
+ }
+
+ key = tokens[KEY_TOKEN].value;
+ nkey = tokens[KEY_TOKEN].length;
+
+ if(nkey > KEY_MAX_LENGTH) {
+ out_string(c, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
+ if(ntokens == 4) {
+ exptime = strtol(tokens[2].value, NULL, 10);
+
+ if(errno == ERANGE) {
+ out_string(c, "CLIENT_ERROR bad command line format");
+ return;
}
+ }
- it->refcount++;
- /* use its expiration time as its deletion time now */
- it->exptime = realtime(exptime);
- it->it_flags |= ITEM_DELETED;
- todelete[delcurr++] = it;
+ it = get_item(key, nkey);
+ if (!it) {
+ out_string(c, "NOT_FOUND");
+ return;
+ }
+
+ if (exptime == 0) {
+ item_unlink(it);
out_string(c, "DELETED");
return;
}
+ if (delcurr >= deltotal) {
+ item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
+ if (new_delete) {
+ todelete = new_delete;
+ deltotal *= 2;
+ } else {
+ /*
+ * can't delete it immediately, user wants a delay,
+ * but we ran out of memory for the delete queue
+ */
+ out_string(c, "SERVER_ERROR out of memory");
+ return;
+ }
+ }
+
+ it->refcount++;
+ /* use its expiration time as its deletion time now */
+ it->exptime = realtime(exptime);
+ it->it_flags |= ITEM_DELETED;
+ todelete[delcurr++] = it;
+ out_string(c, "DELETED");
+ return;
+}
- if (strncmp(command, "own ", 4) == 0) {
- int bucket, gen;
- char *start = command+4;
+void process_command(conn *c, char *command) {
+
+ token_t tokens[MAX_TOKENS];
+ size_t ntokens;
+ int comm;
+
+ if (settings.verbose > 1)
+ fprintf(stderr, "<%d %s\n", c->sfd, command);
+
+ /*
+ * for commands set/add/replace, we build an item and read the data
+ * directly into it, then continue in nread_complete().
+ */
+
+ c->msgcurr = 0;
+ c->msgused = 0;
+ c->iovused = 0;
+ if (add_msghdr(c)) {
+ out_string(c, "SERVER_ERROR out of memory");
+ return;
+ }
+
+ ntokens = tokenize_command(command, tokens, MAX_TOKENS);
+
+ if (ntokens >= 3 &&
+ ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
+ (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
+
+ process_get_command(c, tokens, ntokens);
+
+ } else if (ntokens == 6 &&
+ ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
+ (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
+ (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)))) {
+
+ process_update_command(c, tokens, ntokens, comm);
+
+ } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
+
+ process_arithmetic_command(c, tokens, ntokens, 1);
+
+ } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
+
+ process_arithmetic_command(c, tokens, ntokens, 0);
+
+ } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
+
+ process_delete_command(c, tokens, ntokens);
+
+ } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
+ unsigned int bucket, gen;
if (!settings.managed) {
out_string(c, "CLIENT_ERROR not a managed instance");
return;
}
- if (sscanf(start, "%u:%u\r\n", &bucket,&gen) == 2) {
+
+ if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
out_string(c, "CLIENT_ERROR bucket number out of range");
return;
@@ -1101,16 +1289,15 @@ void process_command(conn *c, char *command) {
out_string(c, "CLIENT_ERROR bad format");
return;
}
- }
- if (strncmp(command, "disown ", 7) == 0) {
+ } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
+
int bucket;
- char *start = command+7;
if (!settings.managed) {
out_string(c, "CLIENT_ERROR not a managed instance");
return;
}
- if (sscanf(start, "%u\r\n", &bucket) == 1) {
+ if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
out_string(c, "CLIENT_ERROR bucket number out of range");
return;
@@ -1122,16 +1309,14 @@ void process_command(conn *c, char *command) {
out_string(c, "CLIENT_ERROR bad format");
return;
}
- }
- if (strncmp(command, "bg ", 3) == 0) {
+ } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
int bucket, gen;
- char *start = command+3;
if (!settings.managed) {
out_string(c, "CLIENT_ERROR not a managed instance");
return;
}
- if (sscanf(start, "%u:%u\r\n", &bucket,&gen) == 2) {
+ if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
/* we never write anything back, even if input's wrong */
if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen<=0)) {
/* do nothing, bad input */
@@ -1145,28 +1330,25 @@ void process_command(conn *c, char *command) {
out_string(c, "CLIENT_ERROR bad format");
return;
}
- }
- if (strncmp(command, "stats", 5) == 0) {
- process_stat(c, command);
- return;
- }
+ } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
+
+ process_stat(c, tokens, ntokens);
- if (strncmp(command, "flush_all", 9) == 0) {
+ } else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
time_t exptime = 0;
- int res;
set_current_time();
- if (strcmp(command, "flush_all") == 0) {
+ if(ntokens == 2) {
settings.oldest_live = current_time - 1;
item_flush_expired();
out_string(c, "OK");
return;
}
- res = sscanf(command, "%*s %ld", &exptime);
- if (res != 1) {
- out_string(c, "ERROR");
+ exptime = strtol(tokens[1].value, NULL, 10);
+ if(errno == ERANGE) {
+ out_string(c, "CLIENT_ERROR bad command line format");
return;
}
@@ -1174,45 +1356,49 @@ void process_command(conn *c, char *command) {
item_flush_expired();
out_string(c, "OK");
return;
- }
+
+ } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
- if (strcmp(command, "version") == 0) {
out_string(c, "VERSION " VERSION);
- return;
- }
- if (strcmp(command, "quit") == 0) {
- conn_set_state(c, conn_closing);
- return;
- }
+ } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
- if (strncmp(command, "slabs reassign ", 15) == 0) {
+ conn_set_state(c, conn_closing);
+
+ } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
+ strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
#ifdef ALLOW_SLABS_REASSIGN
- int src, dst;
- char *start = command+15;
- if (sscanf(start, "%u %u\r\n", &src, &dst) == 2) {
- int rv = slabs_reassign(src, dst);
- if (rv == 1) {
- out_string(c, "DONE");
- return;
- }
- if (rv == 0) {
- out_string(c, "CANT");
- return;
- }
- if (rv == -1) {
- out_string(c, "BUSY");
- return;
- }
+
+ int src, dst, rv;
+
+ src = strtol(tokens[2].value, NULL, 10);
+ dst = strtol(tokens[3].value, NULL, 10);
+
+ if(errno == ERANGE) {
+ out_string(c, "CLIENT_ERROR bad command line format");
+ return;
+ }
+
+ rv = slabs_reassign(src, dst);
+ if (rv == 1) {
+ out_string(c, "DONE");
+ return;
+ }
+ if (rv == 0) {
+ out_string(c, "CANT");
+ return;
+ }
+ if (rv == -1) {
+ out_string(c, "BUSY");
+ return;
}
- out_string(c, "CLIENT_ERROR bogus command");
#else
out_string(c, "CLIENT_ERROR Slab reassignment not supported");
#endif
- return;
- }
- out_string(c, "ERROR");
+ } else {
+ out_string(c, "ERROR");
+ }
return;
}
@@ -1222,6 +1408,8 @@ void process_command(conn *c, char *command) {
int try_read_command(conn *c) {
char *el, *cont;
+ assert(c->rcurr <= c->rbuf + c->rsize);
+
if (!c->rbytes)
return 0;
el = memchr(c->rcurr, '\n', c->rbytes);
@@ -1233,11 +1421,15 @@ int try_read_command(conn *c) {
}
*el = '\0';
+ assert(cont <= c->rcurr + c->rbytes);
+
process_command(c, c->rcurr);
c->rbytes -= (cont - c->rcurr);
c->rcurr = cont;
+ assert(c->rcurr <= c->rbuf + c->rsize);
+
return 1;
}
@@ -1347,6 +1539,25 @@ int update_event(conn *c, int new_flags) {
}
/*
+ * Sets whether we are listening for new connections or not.
+ */
+void accept_new_conns(int do_accept) {
+ if (do_accept) {
+ update_event(listen_conn, EV_READ | EV_PERSIST);
+ if (listen(listen_conn->sfd, 1024)) {
+ perror("listen");
+ }
+ }
+ else {
+ update_event(listen_conn, 0);
+ if (listen(listen_conn->sfd, 0)) {
+ perror("listen");
+ }
+ }
+}
+
+
+/*
* Transmit the next chunk of data from our list of msgbuf structures.
*
* Returns:
@@ -1426,6 +1637,10 @@ void drive_machine(conn *c) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
stop = 1;
break;
+ } else if (errno == EMFILE) {
+ if (settings.verbose > 0)
+ fprintf(stderr, "Too many open connections\n");
+ accept_new_conns(0);
} else {
perror("accept()");
}
@@ -1561,7 +1776,7 @@ void drive_machine(conn *c) {
*/
if (c->iovused == 0 || (c->udp && c->iovused == 1)) {
if (add_iov(c, c->wcurr, c->wbytes) ||
- c->udp && build_udp_headers(c)) {
+ (c->udp && build_udp_headers(c))) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't build response\n");
conn_set_state(c, conn_closing);
@@ -1665,7 +1880,7 @@ int new_socket(int is_udp) {
*/
void maximize_sndbuf(int sfd) {
socklen_t intsize = sizeof(int);
- int last_good;
+ int last_good = 0;
int min, max, avg;
int old_size;
@@ -1867,7 +2082,6 @@ void delete_handler(int fd, short which, void *arg) {
{
int i, j=0;
- rel_time_t now = current_time;
for (i=0; i<delcurr; i++) {
item *it = todelete[i];
if (item_delete_lock_over(it)) {
@@ -2015,7 +2229,6 @@ void sig_handler(int sig) {
int main (int argc, char **argv) {
int c;
- conn *l_conn;
conn *u_conn;
struct in_addr addr;
int lock_memory = 0;
@@ -2260,7 +2473,7 @@ int main (int argc, char **argv) {
exit(1);
}
/* create the initial listening connection */
- if (!(l_conn = conn_new(l_socket, conn_listening, EV_READ | EV_PERSIST, 1, 0))) {
+ if (!(listen_conn = conn_new(l_socket, conn_listening, EV_READ | EV_PERSIST, 1, 0))) {
fprintf(stderr, "failed to create listening connection");
exit(1);
}
diff --git a/memcached.h b/memcached.h
index b7c2ccd..4fb2a53 100644
--- a/memcached.h
+++ b/memcached.h
@@ -85,9 +85,9 @@ typedef struct _stritem {
#define ITEM_key(item) ((char*)&((item)->end[0]))
/* warning: don't use these macros with a function, as it evals its arg twice */
-#define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey)
-#define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + (item)->nsuffix)
-#define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + (item)->nsuffix + (item)->nbytes)
+#define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey + 1)
+#define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + 1 + (item)->nsuffix)
+#define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + 1 + (item)->nsuffix + (item)->nbytes)
enum conn_states {
conn_listening, /* the socket which listens for connections */
@@ -230,12 +230,14 @@ char* slabs_stats(int *buflen);
0 = fail
-1 = tried. busy. send again shortly. */
int slabs_reassign(unsigned char srcid, unsigned char dstid);
+int slabs_newslab(unsigned int id);
/* event handling, network IO */
void event_handler(int fd, short which, void *arg);
conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp);
void conn_close(conn *c);
void conn_init(void);
+void accept_new_conns(int do_accept);
void drive_machine(conn *c);
int new_socket(int isUdp);
int server_socket(int port, int isUdp);
@@ -256,13 +258,16 @@ void stats_init(void);
void settings_init(void);
/* associative array */
void assoc_init(void);
-item *assoc_find(char *key);
-int assoc_insert(char *key, item *item);
-void assoc_delete(char *key);
+item *assoc_find(const char *key, size_t nkey);
+int assoc_insert(item *item);
+void assoc_delete(const char *key, size_t nkey);
+void assoc_move_next_bucket(void);
+
void item_init(void);
-item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes);
+item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
void item_free(item *it);
-int item_size_ok(char *key, int flags, int nbytes);
+int item_size_ok(char *key, size_t nkey, int flags, int nbytes);
+
int item_link(item *it); /* may fail if transgresses limits */
void item_unlink(item *it);
void item_remove(item *it);