diff options
author | Steven Grimm <sgrimm@facebook.com> | 2006-11-26 21:33:47 +0000 |
---|---|---|
committer | Steven Grimm <sgrimm@facebook.com> | 2006-11-26 21:33:47 +0000 |
commit | 217dcce072c268a9f404c3dcda89aa55c1a7423f (patch) | |
tree | 6a4063bfdf2cc3c073a3f0a31601c057ceb4f1cb | |
parent | 117ca7fc923dfac10d033dec47175eef09817926 (diff) | |
download | memcached-217dcce072c268a9f404c3dcda89aa55c1a7423f.tar.gz |
Incorporate changes from "performance" branch (revisions 414-419).
git-svn-id: http://code.sixapart.com/svn/memcached/trunk/server@450 b0b603af-a30f-0410-a34e-baf09ae79d0b
-rw-r--r-- | ChangeLog | 18 | ||||
-rw-r--r-- | assoc.c | 161 | ||||
-rw-r--r-- | items.c | 57 | ||||
-rw-r--r-- | memcached.c | 797 | ||||
-rw-r--r-- | memcached.h | 21 |
5 files changed, 699 insertions, 355 deletions
@@ -1,3 +1,21 @@ +2006-11-26 + * Steven Grimm <sgrimm@facebook.com>: Performance improvements: + + Dynamic sizing of hashtable to reduce collisions on very large + caches and conserve memory on small caches. + + Only reposition items in the LRU queue once a minute, to reduce + overhead of accessing extremely frequently-used items. + + Stop listening for new connections until an existing one closes + if we run out of available file descriptors. + + Command parser refactoring: Add a single-pass tokenizer to cut + down on string scanning. Split the command processing into + separate functions for easier profiling and better readability. + Pass key lengths along with the keys in all API functions that + need keys, to avoid needing to call strlen() repeatedly. + 2006-11-25 * Steve Peters <steve@fisharerojo.org>: OpenBSD has a malloc.h, but warns to use stdlib.h instead @@ -31,15 +31,6 @@ #include "memcached.h" -typedef unsigned long int ub4; /* unsigned 4-byte quantities */ -typedef unsigned char ub1; /* unsigned 1-byte quantities */ - -/* hard-code one million buckets, for now (2**20 == 4MB hash) */ -#define HASHPOWER 20 - -#define hashsize(n) ((ub4)1<<(n)) -#define hashmask(n) (hashsize(n)-1) - /* * Since the hash function does bit manipulation, it needs to know * whether it's big or little-endian. ENDIAN_LITTLE and ENDIAN_BIG @@ -459,25 +450,64 @@ uint32_t hash( const void *key, size_t length, uint32_t initval) #error Must define HASH_BIG_ENDIAN or HASH_LITTLE_ENDIAN #endif // hash_XXX_ENDIAN == 1 -static item** hashtable = 0; +typedef unsigned long int ub4; /* unsigned 4-byte quantities */ +typedef unsigned char ub1; /* unsigned 1-byte quantities */ + +/* how many powers of 2's worth of buckets we use */ +int hashpower = 16; + +#define hashsize(n) ((ub4)1<<(n)) +#define hashmask(n) (hashsize(n)-1) + +/* Main hash table. This is where we look except during expansion. */ +static item** primary_hashtable = 0; + +/* + * Previous hash table. During expansion, we look here for keys that haven't + * been moved over to the primary yet. + */ +static item** old_hashtable = 0; + +/* Number of items in the hash table. */ +static int hash_items = 0; + +/* Flag: Are we in the middle of expanding now? */ +static int expanding = 0; + +/* + * During expansion we migrate values with bucket granularity; this is how + * far we've gotten so far. Ranges from 0 .. hashsize(hashpower - 1) - 1. + */ +static int expand_bucket = 0; void assoc_init(void) { - unsigned int hash_size = hashsize(HASHPOWER) * sizeof(void*); - hashtable = malloc(hash_size); - if (! hashtable) { + unsigned int hash_size = hashsize(hashpower) * sizeof(void*); + primary_hashtable = malloc(hash_size); + if (! primary_hashtable) { fprintf(stderr, "Failed to init hashtable.\n"); exit(1); } - memset(hashtable, 0, hash_size); + memset(primary_hashtable, 0, hash_size); } -item *assoc_find(char *key) { - ub4 hv = hash(key, strlen(key), 0) & hashmask(HASHPOWER); - item *it = hashtable[hv]; +item *assoc_find(const char *key, size_t nkey) { + uint32_t hv = hash(key, nkey, 0); + item *it; + int oldbucket; + + if (expanding && + (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket) + { + it = old_hashtable[oldbucket]; + } else { + it = primary_hashtable[hv & hashmask(hashpower)]; + } while (it) { - if (strcmp(key, ITEM_key(it)) == 0) + if ((nkey == it->nkey) && + (memcmp(key, ITEM_key(it), nkey) == 0)) { return it; + } it = it->h_next; } return 0; @@ -486,35 +516,104 @@ item *assoc_find(char *key) { /* returns the address of the item pointer before the key. if *item == 0, the item wasn't found */ -static item** _hashitem_before (char *key) { - ub4 hv = hash(key, strlen(key), 0) & hashmask(HASHPOWER); - item **pos = &hashtable[hv]; +static item** _hashitem_before (const char *key, size_t nkey) { + uint32_t hv = hash(key, nkey, 0); + item **pos; + int oldbucket; - while (*pos && strcmp(key, ITEM_key(*pos))) { + if (expanding && + (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket) + { + pos = &old_hashtable[oldbucket]; + } else { + pos = &primary_hashtable[hv & hashmask(hashpower)]; + } + + while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) { pos = &(*pos)->h_next; } return pos; } +/* grows the hashtable to the next power of 2. */ +static void assoc_expand(void) { + old_hashtable = primary_hashtable; + + primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *)); + if (primary_hashtable) { + if (settings.verbose > 1) + fprintf(stderr, "Hash table expansion starting\n"); + hashpower++; + expanding = 1; + expand_bucket = 0; + assoc_move_next_bucket(); + } else { + primary_hashtable = old_hashtable; + /* Bad news, but we can keep running. */ + } +} + +/* migrates the next bucket to the primary hashtable if we're expanding. */ +void assoc_move_next_bucket(void) { + item *it, *next; + int bucket; + + if (expanding) { + for (it = old_hashtable[expand_bucket]; NULL != it; it = next) { + next = it->h_next; + + bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower); + it->h_next = primary_hashtable[bucket]; + primary_hashtable[bucket] = it; + } + + expand_bucket++; + if (expand_bucket == hashsize(hashpower - 1)) { + expanding = 0; + free(old_hashtable); + if (settings.verbose > 1) + fprintf(stderr, "Hash table expansion done\n"); + } + } +} + /* Note: this isn't an assoc_update. The key must not already exist to call this */ -int assoc_insert(char *key, item *it) { - ub4 hv; - assert(assoc_find(key) == 0); /* shouldn't have duplicately named things defined */ - hv = hash(key, strlen(key), 0) & hashmask(HASHPOWER); - it->h_next = hashtable[hv]; - hashtable[hv] = it; +int assoc_insert(item *it) { + uint32_t hv; + int oldbucket; + + assert(assoc_find(ITEM_key(it), it->nkey) == 0); /* shouldn't have duplicately named things defined */ + + hv = hash(ITEM_key(it), it->nkey, 0); + if (expanding && + (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket) + { + it->h_next = old_hashtable[oldbucket]; + old_hashtable[oldbucket] = it; + } else { + it->h_next = primary_hashtable[hv & hashmask(hashpower)]; + primary_hashtable[hv & hashmask(hashpower)] = it; + } + + hash_items++; + if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) { + assoc_expand(); + } + return 1; } -void assoc_delete(char *key) { - item **before = _hashitem_before(key); +void assoc_delete(const char *key, size_t nkey) { + item **before = _hashitem_before(key, nkey); + if (*before) { item *nxt = (*before)->h_next; (*before)->h_next = 0; /* probably pointless, but whatever. */ *before = nxt; + hash_items--; return; } - /* Note: we never actually get here. the callers don't delete things + /* Note: we never actually get here. the callers don't delete things they can't find. */ assert(*before != 0); } @@ -19,6 +19,12 @@ #include "memcached.h" +/* + * We only reposition items in the LRU queue if they haven't been repositioned + * in this many seconds. That saves us from churning on frequently-accessed + * items. + */ +#define ITEM_UPDATE_INTERVAL 60 #define LARGEST_ID 255 static item *heads[LARGEST_ID]; @@ -38,28 +44,29 @@ void item_init(void) { /* * Generates the variable-sized part of the header for an object. * + * key - The key + * nkey - The length of the key + * flags - key flags + * nbytes - Number of bytes to hold value and addition CRLF terminator * suffix - Buffer for the "VALUE" line suffix (flags, size). * nsuffix - The length of the suffix is stored here. - * keylen - The length of the key plus any padding required to word-align the - * "VALUE" suffix (which is done to speed up copying.) * * Returns the total size of the header. */ -int item_make_header(char *key, int flags, int nbytes, - char *suffix, int *nsuffix, int *keylen) { - *keylen = strlen(key) + 1; if(*keylen % 4) *keylen += 4 - (*keylen % 4); +int item_make_header(char *key, uint8_t nkey, int flags, int nbytes, + char *suffix, int *nsuffix) { *nsuffix = sprintf(suffix, " %u %u\r\n", flags, nbytes - 2); - return sizeof(item) + *keylen + *nsuffix + nbytes; + return sizeof(item) + nkey + *nsuffix + nbytes; } - -item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes) { - int nsuffix, ntotal, len; + +item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) { + int nsuffix, ntotal; item *it; unsigned int id; char suffix[40]; - ntotal = item_make_header(key, flags, nbytes, suffix, &nsuffix, &len); - + ntotal = item_make_header(key, nkey + 1, flags, nbytes, suffix, &nsuffix); + id = slabs_clsid(ntotal); if (id == 0) return 0; @@ -75,8 +82,8 @@ item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes) { if (!settings.evict_to_free) return 0; - /* - * try to get one off the right LRU + /* + * try to get one off the right LRU * don't necessariuly unlink the tail because it may be locked: refcount>0 * search up from tail an item with refcount==0 and unlink it; give up after 50 * tries @@ -104,7 +111,7 @@ item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes) { it->next = it->prev = it->h_next = 0; it->refcount = 0; it->it_flags = 0; - it->nkey = len; + it->nkey = nkey; it->nbytes = nbytes; strcpy(ITEM_key(it), key); it->exptime = exptime; @@ -130,12 +137,12 @@ void item_free(item *it) { * Returns true if an item will fit in the cache (its size does not exceed * the maximum for a cache entry.) */ -int item_size_ok(char *key, int flags, int nbytes) { +int item_size_ok(char *key, size_t nkey, int flags, int nbytes) { char prefix[40]; - int keylen, nsuffix; + int nsuffix; - return slabs_clsid(item_make_header(key, flags, nbytes, - prefix, &nsuffix, &keylen)) != 0; + return slabs_clsid(item_make_header(key, nkey + 1, flags, nbytes, + prefix, &nsuffix)) != 0; } void item_link_q(item *it) { /* item is the new head */ @@ -184,7 +191,7 @@ int item_link(item *it) { assert(it->nbytes < 1048576); it->it_flags |= ITEM_LINKED; it->time = current_time; - assoc_insert(ITEM_key(it), it); + assoc_insert(it); stats.curr_bytes += ITEM_ntotal(it); stats.curr_items += 1; @@ -200,7 +207,7 @@ void item_unlink(item *it) { it->it_flags &= ~ITEM_LINKED; stats.curr_bytes -= ITEM_ntotal(it); stats.curr_items -= 1; - assoc_delete(ITEM_key(it)); + assoc_delete(ITEM_key(it), it->nkey); item_unlink_q(it); } if (it->refcount == 0) item_free(it); @@ -216,11 +223,13 @@ void item_remove(item *it) { } void item_update(item *it) { - assert((it->it_flags & ITEM_SLABBED) == 0); + if (it->time < current_time - ITEM_UPDATE_INTERVAL) { + assert((it->it_flags & ITEM_SLABBED) == 0); - item_unlink_q(it); - it->time = current_time; - item_link_q(it); + item_unlink_q(it); + it->time = current_time; + item_link_q(it); + } } int item_replace(item *it, item *new_it) { diff --git a/memcached.c b/memcached.c index 47c8809..53580af 100644 --- a/memcached.c +++ b/memcached.c @@ -72,6 +72,7 @@ struct settings settings; static item **todelete = 0; static int delcurr; static int deltotal; +static conn *listen_conn; #define TRANSMIT_COMPLETE 0 #define TRANSMIT_INCOMPLETE 1 @@ -104,6 +105,7 @@ void stats_init(void) { values are now false in boolean context... */ stats.started = time(0) - 2; } + void stats_reset(void) { stats.total_items = stats.total_conns = 0; stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = 0; @@ -133,8 +135,8 @@ int item_delete_lock_over (item *it) { } /* wrapper around assoc_find which does the lazy expiration/deletion logic */ -item *get_item_notedeleted(char *key, int *delete_locked) { - item *it = assoc_find(key); +item *get_item_notedeleted(char *key, size_t nkey, int *delete_locked) { + item *it = assoc_find(key, nkey); if (delete_locked) *delete_locked = 0; if (it && (it->it_flags & ITEM_DELETED)) { /* it's flagged as delete-locked. let's see if that condition @@ -157,8 +159,8 @@ item *get_item_notedeleted(char *key, int *delete_locked) { return it; } -item *get_item(char *key) { - return get_item_notedeleted(key, 0); +item *get_item(char *key, size_t nkey) { + return get_item_notedeleted(key, nkey, 0); } /* @@ -357,6 +359,7 @@ void conn_close(conn *c) { fprintf(stderr, "<%d connection closed.\n", c->sfd); close(c->sfd); + accept_new_conns(1); conn_cleanup(c); /* if the connection has big buffers, just free it */ @@ -390,12 +393,12 @@ int do_realloc(void **orig, int newsize, int bytes_per_item, int *size) { if (newbuf) { *orig = newbuf; *size = newsize; - return 1; + return 1; } return 0; } - /* +/* * Shrinks a connection's buffers if they're too big. This prevents * periodic large "get" requests from permanently chewing lots of server * memory. @@ -408,7 +411,10 @@ void conn_shrink(conn *c) { return; if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) { - do_realloc((void **)&c->rbuf, DATA_BUFFER_SIZE, 1, &c->rsize); + if (c->rcurr != c->rbuf) + memmove(c->rbuf, c->rcurr, c->rbytes); + do_realloc((void **)&c->rbuf, DATA_BUFFER_SIZE, 1, &c->rsize); + c->rcurr = c->rbuf; } if (c->isize > ITEM_LIST_HIGHWAT) { @@ -433,6 +439,7 @@ void conn_set_state(conn *c, int state) { if (state != c->state) { if (state == conn_read) { conn_shrink(c); + assoc_move_next_bucket(); } c->state = state; } @@ -475,7 +482,6 @@ int ensure_iov_space(conn *c) { int add_iov(conn *c, const void *buf, int len) { struct msghdr *m; - int i; int leftover; int limit_to_mtu; @@ -490,7 +496,7 @@ int add_iov(conn *c, const void *buf, int len) { /* We may need to start a new msghdr if this one is full. */ if (m->msg_iovlen == IOV_MAX || - limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE) { + (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) { add_msghdr(c); m = &c->msglist[c->msgused - 1]; } @@ -602,7 +608,7 @@ void complete_nread(conn *c) { goto err; } - old_it = get_item_notedeleted(key, &delete_locked); + old_it = get_item_notedeleted(key, it->nkey, &delete_locked); if (old_it && comm == NREAD_ADD) { item_update(old_it); /* touches item, promotes to head of LRU */ @@ -625,7 +631,7 @@ void complete_nread(conn *c) { window... in which case we have to find the old hidden item that's in the namespace/LRU but wasn't returned by get_item.... because we need to replace it (below) */ - old_it = assoc_find(key); + old_it = assoc_find(key, it->nkey); } if (old_it) @@ -643,10 +649,94 @@ err: return; } -void process_stat(conn *c, char *command) { +typedef struct token_s { + char* value; + size_t length; +} token_t; + +#define COMMAND_TOKEN 0 +#define SUBCOMMAND_TOKEN 1 +#define KEY_TOKEN 1 +#define KEY_MAX_LENGTH 250 + +#define MAX_TOKENS 6 + +/* + * Tokenize the command string by replacing whitespace with '\0' and update + * the token array tokens with pointer to start of each token and length. + * Returns total number of tokens. The last valid token is the terminal + * token (value points to the first unprocessed character of the string and + * length zero). + * + * Usage example: + * + * while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) { + * for(int ix = 0; tokens[ix].length != 0; ix++) { + * ... + * } + * ncommand = tokens[ix].value - command; + * command = tokens[ix].value; + * } + */ +size_t tokenize_command(char* command, token_t* tokens, size_t max_tokens) { + char* cp; + char* value = NULL; + size_t length = 0; + size_t ntokens = 0; + + assert(command != NULL && tokens != NULL && max_tokens > 1); + + cp = command; + while(*cp != '\0' && ntokens < max_tokens - 1) { + if(*cp == ' ') { + // If we've accumulated a token, this is the end of it. + if(length > 0) { + tokens[ntokens].value = value; + tokens[ntokens].length = length; + ntokens++; + length = 0; + value = NULL; + } + *cp = '\0'; + } else { + if(length == 0) { + value = cp; + } + length++; + } + cp++; + } + + if(ntokens < max_tokens - 1 && length > 0) { + tokens[ntokens].value = value; + tokens[ntokens].length = length; + ntokens++; + } + + /* + * If we scanned the whole string, the terminal value pointer is null, + * otherwise it is the first unprocessed character. + */ + tokens[ntokens].value = *cp == '\0' ? NULL : cp; + tokens[ntokens].length = 0; + ntokens++; + + return ntokens; +} + +void process_stat(conn *c, token_t* tokens, size_t ntokens) { rel_time_t now = current_time; + char* command; + char* subcommand; - if (strcmp(command, "stats") == 0) { + if(ntokens < 2) { + out_string(c, "CLIENT_ERROR bad command line"); + return; + } + + command = tokens[COMMAND_TOKEN].value; + + if (ntokens == 2 && strcmp(command, "stats") == 0) { char temp[1024]; pid_t pid = getpid(); char *pos = temp; @@ -679,7 +769,9 @@ void process_stat(conn *c, char *command) { return; } - if (strcmp(command, "stats reset") == 0) { + subcommand = tokens[SUBCOMMAND_TOKEN].value; + + if (strcmp(subcommand, "reset") == 0) { stats_reset(); out_string(c, "RESET"); return; @@ -687,7 +779,7 @@ void process_stat(conn *c, char *command) { #ifdef HAVE_MALLOC_H #ifdef HAVE_STRUCT_MALLINFO - if (strcmp(command, "stats malloc") == 0) { + if (strcmp(subcommand, "malloc") == 0) { char temp[512]; struct mallinfo info; char *pos = temp; @@ -709,7 +801,7 @@ void process_stat(conn *c, char *command) { #endif /* HAVE_STRUCT_MALLINFO */ #endif /* HAVE_MALLOC_H */ - if (strcmp(command, "stats maps") == 0) { + if (strcmp(subcommand, "maps") == 0) { char *wbuf; int wsize = 8192; /* should be enough */ int fd; @@ -720,7 +812,7 @@ void process_stat(conn *c, char *command) { out_string(c, "SERVER_ERROR out of memory"); return; } - + fd = open("/proc/self/maps", O_RDONLY); if (fd == -1) { out_string(c, "SERVER_ERROR cannot open the maps file"); @@ -742,22 +834,31 @@ void process_stat(conn *c, char *command) { strcpy(wbuf + res, "END\r\n"); c->write_and_free=wbuf; c->wcurr=wbuf; - c->wbytes = res + 6; + c->wbytes = res + 5; // Don't write the terminal '\0' conn_set_state(c, conn_write); c->write_and_go = conn_read; close(fd); return; } - if (strncmp(command, "stats cachedump", 15) == 0) { + if (strcmp(subcommand, "cachedump") == 0) { + char *buf; unsigned int bytes, id, limit = 0; - char *start = command + 15; - if (sscanf(start, "%u %u\r\n", &id, &limit) < 1) { + + if(ntokens < 5) { out_string(c, "CLIENT_ERROR bad command line"); return; } + id = strtoul(tokens[2].value, NULL, 10); + limit = strtoul(tokens[3].value, NULL, 10); + + if(errno == ERANGE) { + out_string(c, "CLIENT_ERROR bad command line format"); + return; + } + buf = item_cachedump(id, limit, &bytes); if (buf == 0) { out_string(c, "SERVER_ERROR out of memory"); @@ -772,7 +873,7 @@ void process_stat(conn *c, char *command) { return; } - if (strcmp(command, "stats slabs")==0) { + if (strcmp(subcommand, "slabs")==0) { int bytes = 0; char *buf = slabs_stats(&bytes); if (!buf) { @@ -787,14 +888,14 @@ void process_stat(conn *c, char *command) { return; } - if (strcmp(command, "stats items")==0) { + if (strcmp(subcommand, "items")==0) { char buffer[4096]; item_stats(buffer, 4096); out_string(c, buffer); return; } - if (strcmp(command, "stats sizes")==0) { + if (strcmp(subcommand, "sizes")==0) { int bytes = 0; char *buf = item_stats_sizes(&bytes); if (! buf) { @@ -813,171 +914,39 @@ void process_stat(conn *c, char *command) { out_string(c, "ERROR"); } -void process_command(conn *c, char *command) { - - int comm = 0; - int incr = 0; - - /* - * for commands set/add/replace, we build an item and read the data - * directly into it, then continue in nread_complete(). - */ - - if (settings.verbose > 1) - fprintf(stderr, "<%d %s\n", c->sfd, command); - - c->msgcurr = 0; - c->msgused = 0; - c->iovused = 0; - if (add_msghdr(c)) { - out_string(c, "SERVER_ERROR out of memory"); - return; - } - - if ((strncmp(command, "add ", 4) == 0 && (comm = NREAD_ADD)) || - (strncmp(command, "set ", 4) == 0 && (comm = NREAD_SET)) || - (strncmp(command, "replace ", 8) == 0 && (comm = NREAD_REPLACE))) { - - char key[251]; - int flags; - time_t expire; - int len, res; - item *it; - - res = sscanf(command, "%*s %250s %u %ld %d\n", key, &flags, &expire, &len); - if (res!=4 || strlen(key)==0 ) { - out_string(c, "CLIENT_ERROR bad command line format"); - return; - } - - if (settings.managed) { - int bucket = c->bucket; - if (bucket == -1) { - out_string(c, "CLIENT_ERROR no BG data in managed mode"); - return; - } - c->bucket = -1; - if (buckets[bucket] != c->gen) { - out_string(c, "ERROR_NOT_OWNER"); - return; - } - } - - expire = realtime(expire); - it = item_alloc(key, flags, expire, len+2); - - if (it == 0) { - if (! item_size_ok(key, flags, len + 2)) - out_string(c, "SERVER_ERROR object too large for cache"); - else - out_string(c, "SERVER_ERROR out of memory"); - /* swallow the data line */ - c->write_and_go = conn_swallow; - c->sbytes = len+2; - return; - } - - c->item_comm = comm; - c->item = it; - c->ritem = ITEM_data(it); - c->rlbytes = it->nbytes; - conn_set_state(c, conn_nread); - return; - } +inline void process_get_command(conn *c, token_t* tokens, size_t ntokens) { + char *key; + size_t nkey; + int i = 0; + item *it; + token_t* key_token = &tokens[KEY_TOKEN]; - if ((strncmp(command, "incr ", 5) == 0 && (incr = 1)) || - (strncmp(command, "decr ", 5) == 0)) { - char temp[32]; - unsigned int value; - item *it; - unsigned int delta; - char key[251]; - int res; - char *ptr; - res = sscanf(command, "%*s %250s %u\n", key, &delta); - if (res!=2 || strlen(key)==0 ) { - out_string(c, "CLIENT_ERROR bad command line format"); + if (settings.managed) { + int bucket = c->bucket; + if (bucket == -1) { + out_string(c, "CLIENT_ERROR no BG data in managed mode"); return; } - if (settings.managed) { - int bucket = c->bucket; - if (bucket == -1) { - out_string(c, "CLIENT_ERROR no BG data in managed mode"); - return; - } - c->bucket = -1; - if (buckets[bucket] != c->gen) { - out_string(c, "ERROR_NOT_OWNER"); - return; - } - } - it = get_item(key); - if (!it) { - out_string(c, "NOT_FOUND"); + c->bucket = -1; + if (buckets[bucket] != c->gen) { + out_string(c, "ERROR_NOT_OWNER"); return; } - ptr = ITEM_data(it); - while (*ptr && (*ptr<'0' && *ptr>'9')) ptr++; // BUG: can't be true - value = atoi(ptr); - if (incr) - value+=delta; - else { - if (delta >= value) value = 0; - else value-=delta; - } - sprintf(temp, "%u", value); - res = strlen(temp); - if (res + 2 > it->nbytes) { /* need to realloc */ - item *new_it; - new_it = item_alloc(ITEM_key(it), atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 ); - if (new_it == 0) { - out_string(c, "SERVER_ERROR out of memory"); - return; - } - memcpy(ITEM_data(new_it), temp, res); - memcpy(ITEM_data(new_it) + res, "\r\n", 2); - item_replace(it, new_it); - } else { /* replace in-place */ - memcpy(ITEM_data(it), temp, res); - memset(ITEM_data(it) + res, ' ', it->nbytes-res-2); - } - out_string(c, temp); - return; } - if (strncmp(command, "bget ", 5) == 0) { - c->binary = 1; - goto get; - } - if (strncmp(command, "get ", 4) == 0) { - - char *start = command + 4; - char key[251]; - int next; - int i; - item *it; - rel_time_t now; - get: - now = current_time; - i = 0; - - if (settings.managed) { - int bucket = c->bucket; - if (bucket == -1) { - out_string(c, "CLIENT_ERROR no BG data in managed mode"); - return; - } - c->bucket = -1; - if (buckets[bucket] != c->gen) { - out_string(c, "ERROR_NOT_OWNER"); + do { + while(key_token->length != 0) { + + key = key_token->value; + nkey = key_token->length; + + if(nkey > KEY_MAX_LENGTH) { + out_string(c, "CLIENT_ERROR bad command line format"); return; } - } - - while(sscanf(start, " %250s%n", key, &next) >= 1) { - start+=next; + stats.get_cmds++; - it = get_item(key); + it = get_item(key, nkey); if (it) { if (i >= c->isize) { item **new_list = realloc(c->ilist, sizeof(item *)*c->isize*2); @@ -986,7 +955,7 @@ void process_command(conn *c, char *command) { c->ilist = new_list; } else break; } - + /* * Construct the response. Each hit adds three elements to the * outgoing data list: @@ -994,13 +963,12 @@ void process_command(conn *c, char *command) { * key * " " + flags + " " + data length + "\r\n" + data (with \r\n) */ - /* TODO: can we avoid the strlen() func call and cache that in wasted byte in item struct? */ if (add_iov(c, "VALUE ", 6) || - add_iov(c, ITEM_key(it), strlen(ITEM_key(it))) || + add_iov(c, ITEM_key(it), it->nkey) || add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes)) - { - break; - } + { + break; + } if (settings.verbose > 1) fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it)); @@ -1009,87 +977,307 @@ void process_command(conn *c, char *command) { item_update(it); *(c->ilist + i) = it; i++; + } else stats.get_misses++; + + key_token++; } - c->icurr = c->ilist; - c->ileft = i; + /* + * If the command string hasn't been fully processed, get the next set + * of tokens. + */ + if(key_token->value != NULL) { + ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS); + key_token = tokens; + } + + } while(key_token->value != NULL); - if (settings.verbose > 1) - fprintf(stderr, ">%d END\n", c->sfd); - add_iov(c, "END\r\n", 5); + c->icurr = c->ilist; + c->ileft = i; + + if (settings.verbose > 1) + fprintf(stderr, ">%d END\n", c->sfd); + add_iov(c, "END\r\n", 5); + + if (c->udp && build_udp_headers(c)) { + out_string(c, "SERVER_ERROR out of memory"); + } + else { + conn_set_state(c, conn_mwrite); + c->msgcurr = 0; + } + return; +} + +void process_update_command(conn *c, token_t* tokens, size_t ntokens, int comm) { + char *key; + size_t nkey; + int flags; + time_t exptime; + int vlen; + item *it; - if (c->udp && build_udp_headers(c)) { + if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) { + out_string(c, "CLIENT_ERROR bad command line format"); + return; + } + + key = tokens[KEY_TOKEN].value; + nkey = tokens[KEY_TOKEN].length; + + flags = strtoul(tokens[2].value, NULL, 10); + exptime = strtol(tokens[3].value, NULL, 10); + vlen = strtol(tokens[4].value, NULL, 10); + + if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) { + out_string(c, "CLIENT_ERROR bad command line format"); + return; + } + + if (settings.managed) { + int bucket = c->bucket; + if (bucket == -1) { + out_string(c, "CLIENT_ERROR no BG data in managed mode"); + return; + } + c->bucket = -1; + if (buckets[bucket] != c->gen) { + out_string(c, "ERROR_NOT_OWNER"); + return; + } + } + + it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2); + + if (it == 0) { + if (! item_size_ok(key, nkey, flags, vlen + 2)) + out_string(c, "SERVER_ERROR object too large for cache"); + else out_string(c, "SERVER_ERROR out of memory"); + /* swallow the data line */ + c->write_and_go = conn_swallow; + c->sbytes = vlen+2; + return; + } + + c->item_comm = comm; + c->item = it; + c->ritem = ITEM_data(it); + c->rlbytes = it->nbytes; + conn_set_state(c, conn_nread); + return; +} + +void process_arithmetic_command(conn *c, token_t* tokens, size_t ntokens, int incr) { + char temp[32]; + unsigned int value; + item *it; + unsigned int delta; + char *key; + size_t nkey; + int res; + char *ptr; + + if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) { + out_string(c, "CLIENT_ERROR bad command line format"); + return; + } + + key = tokens[KEY_TOKEN].value; + nkey = tokens[KEY_TOKEN].length; + + if (settings.managed) { + int bucket = c->bucket; + if (bucket == -1) { + out_string(c, "CLIENT_ERROR no BG data in managed mode"); + return; } - else { - conn_set_state(c, conn_mwrite); - c->msgcurr = 0; + c->bucket = -1; + if (buckets[bucket] != c->gen) { + out_string(c, "ERROR_NOT_OWNER"); + return; } + } + + it = get_item(key, nkey); + if (!it) { + out_string(c, "NOT_FOUND"); return; } - if (strncmp(command, "delete ", 7) == 0) { - char key[251]; - item *it; - int res; - time_t exptime = 0; + delta = strtoul(tokens[2].value, NULL, 10); + + if(errno == ERANGE) { + out_string(c, "CLIENT_ERROR bad command line format"); + return; + } - if (settings.managed) { - int bucket = c->bucket; - if (bucket == -1) { - out_string(c, "CLIENT_ERROR no BG data in managed mode"); - return; - } - c->bucket = -1; - if (buckets[bucket] != c->gen) { - out_string(c, "ERROR_NOT_OWNER"); - return; - } + ptr = ITEM_data(it); + while (*ptr && (*ptr<'0' && *ptr>'9')) ptr++; // BUG: can't be true + + value = strtol(ptr, NULL, 10); + + if(errno == ERANGE) { + out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value"); + return; + } + + if (incr) + value+=delta; + else { + if (delta >= value) value = 0; + else value-=delta; + } + sprintf(temp, "%u", value); + res = strlen(temp); + if (res + 2 > it->nbytes) { /* need to realloc */ + item *new_it; + new_it = item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 ); + if (new_it == 0) { + out_string(c, "SERVER_ERROR out of memory"); + return; } - res = sscanf(command, "%*s %250s %ld", key, &exptime); - it = get_item(key); - if (!it) { - out_string(c, "NOT_FOUND"); + memcpy(ITEM_data(new_it), temp, res); + memcpy(ITEM_data(new_it) + res, "\r\n", 2); + item_replace(it, new_it); + } else { /* replace in-place */ + memcpy(ITEM_data(it), temp, res); + memset(ITEM_data(it) + res, ' ', it->nbytes-res-2); + } + out_string(c, temp); + return; +} + +void process_delete_command(conn *c, token_t* tokens, size_t ntokens) { + char *key; + size_t nkey; + item *it; + time_t exptime = 0; + + if (settings.managed) { + int bucket = c->bucket; + if (bucket == -1) { + out_string(c, "CLIENT_ERROR no BG data in managed mode"); return; } - if (exptime == 0) { - item_unlink(it); - out_string(c, "DELETED"); + c->bucket = -1; + if (buckets[bucket] != c->gen) { + out_string(c, "ERROR_NOT_OWNER"); return; } - if (delcurr >= deltotal) { - item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2); - if (new_delete) { - todelete = new_delete; - deltotal *= 2; - } else { - /* - * can't delete it immediately, user wants a delay, - * but we ran out of memory for the delete queue - */ - out_string(c, "SERVER_ERROR out of memory"); - return; - } + } + + key = tokens[KEY_TOKEN].value; + nkey = tokens[KEY_TOKEN].length; + + if(nkey > KEY_MAX_LENGTH) { + out_string(c, "CLIENT_ERROR bad command line format"); + return; + } + + if(ntokens == 4) { + exptime = strtol(tokens[2].value, NULL, 10); + + if(errno == ERANGE) { + out_string(c, "CLIENT_ERROR bad command line format"); + return; } + } - it->refcount++; - /* use its expiration time as its deletion time now */ - it->exptime = realtime(exptime); - it->it_flags |= ITEM_DELETED; - todelete[delcurr++] = it; + it = get_item(key, nkey); + if (!it) { + out_string(c, "NOT_FOUND"); + return; + } + + if (exptime == 0) { + item_unlink(it); out_string(c, "DELETED"); return; } + if (delcurr >= deltotal) { + item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2); + if (new_delete) { + todelete = new_delete; + deltotal *= 2; + } else { + /* + * can't delete it immediately, user wants a delay, + * but we ran out of memory for the delete queue + */ + out_string(c, "SERVER_ERROR out of memory"); + return; + } + } + + it->refcount++; + /* use its expiration time as its deletion time now */ + it->exptime = realtime(exptime); + it->it_flags |= ITEM_DELETED; + todelete[delcurr++] = it; + out_string(c, "DELETED"); + return; +} - if (strncmp(command, "own ", 4) == 0) { - int bucket, gen; - char *start = command+4; +void process_command(conn *c, char *command) { + + token_t tokens[MAX_TOKENS]; + size_t ntokens; + int comm; + + if (settings.verbose > 1) + fprintf(stderr, "<%d %s\n", c->sfd, command); + + /* + * for commands set/add/replace, we build an item and read the data + * directly into it, then continue in nread_complete(). + */ + + c->msgcurr = 0; + c->msgused = 0; + c->iovused = 0; + if (add_msghdr(c)) { + out_string(c, "SERVER_ERROR out of memory"); + return; + } + + ntokens = tokenize_command(command, tokens, MAX_TOKENS); + + if (ntokens >= 3 && + ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) || + (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) { + + process_get_command(c, tokens, ntokens); + + } else if (ntokens == 6 && + ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) || + (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) || + (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)))) { + + process_update_command(c, tokens, ntokens, comm); + + } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) { + + process_arithmetic_command(c, tokens, ntokens, 1); + + } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) { + + process_arithmetic_command(c, tokens, ntokens, 0); + + } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) { + + process_delete_command(c, tokens, ntokens); + + } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) { + unsigned int bucket, gen; if (!settings.managed) { out_string(c, "CLIENT_ERROR not a managed instance"); return; } - if (sscanf(start, "%u:%u\r\n", &bucket,&gen) == 2) { + + if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) { if ((bucket < 0) || (bucket >= MAX_BUCKETS)) { out_string(c, "CLIENT_ERROR bucket number out of range"); return; @@ -1101,16 +1289,15 @@ void process_command(conn *c, char *command) { out_string(c, "CLIENT_ERROR bad format"); return; } - } - if (strncmp(command, "disown ", 7) == 0) { + } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) { + int bucket; - char *start = command+7; if (!settings.managed) { out_string(c, "CLIENT_ERROR not a managed instance"); return; } - if (sscanf(start, "%u\r\n", &bucket) == 1) { + if (sscanf(tokens[1].value, "%u", &bucket) == 1) { if ((bucket < 0) || (bucket >= MAX_BUCKETS)) { out_string(c, "CLIENT_ERROR bucket number out of range"); return; @@ -1122,16 +1309,14 @@ void process_command(conn *c, char *command) { out_string(c, "CLIENT_ERROR bad format"); return; } - } - if (strncmp(command, "bg ", 3) == 0) { + } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) { int bucket, gen; - char *start = command+3; if (!settings.managed) { out_string(c, "CLIENT_ERROR not a managed instance"); return; } - if (sscanf(start, "%u:%u\r\n", &bucket,&gen) == 2) { + if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) { /* we never write anything back, even if input's wrong */ if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen<=0)) { /* do nothing, bad input */ @@ -1145,28 +1330,25 @@ void process_command(conn *c, char *command) { out_string(c, "CLIENT_ERROR bad format"); return; } - } - if (strncmp(command, "stats", 5) == 0) { - process_stat(c, command); - return; - } + } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) { + + process_stat(c, tokens, ntokens); - if (strncmp(command, "flush_all", 9) == 0) { + } else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) { time_t exptime = 0; - int res; set_current_time(); - if (strcmp(command, "flush_all") == 0) { + if(ntokens == 2) { settings.oldest_live = current_time - 1; item_flush_expired(); out_string(c, "OK"); return; } - res = sscanf(command, "%*s %ld", &exptime); - if (res != 1) { - out_string(c, "ERROR"); + exptime = strtol(tokens[1].value, NULL, 10); + if(errno == ERANGE) { + out_string(c, "CLIENT_ERROR bad command line format"); return; } @@ -1174,45 +1356,49 @@ void process_command(conn *c, char *command) { item_flush_expired(); out_string(c, "OK"); return; - } + + } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) { - if (strcmp(command, "version") == 0) { out_string(c, "VERSION " VERSION); - return; - } - if (strcmp(command, "quit") == 0) { - conn_set_state(c, conn_closing); - return; - } + } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) { - if (strncmp(command, "slabs reassign ", 15) == 0) { + conn_set_state(c, conn_closing); + + } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 && + strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) { #ifdef ALLOW_SLABS_REASSIGN - int src, dst; - char *start = command+15; - if (sscanf(start, "%u %u\r\n", &src, &dst) == 2) { - int rv = slabs_reassign(src, dst); - if (rv == 1) { - out_string(c, "DONE"); - return; - } - if (rv == 0) { - out_string(c, "CANT"); - return; - } - if (rv == -1) { - out_string(c, "BUSY"); - return; - } + + int src, dst, rv; + + src = strtol(tokens[2].value, NULL, 10); + dst = strtol(tokens[3].value, NULL, 10); + + if(errno == ERANGE) { + out_string(c, "CLIENT_ERROR bad command line format"); + return; + } + + rv = slabs_reassign(src, dst); + if (rv == 1) { + out_string(c, "DONE"); + return; + } + if (rv == 0) { + out_string(c, "CANT"); + return; + } + if (rv == -1) { + out_string(c, "BUSY"); + return; } - out_string(c, "CLIENT_ERROR bogus command"); #else out_string(c, "CLIENT_ERROR Slab reassignment not supported"); #endif - return; - } - out_string(c, "ERROR"); + } else { + out_string(c, "ERROR"); + } return; } @@ -1222,6 +1408,8 @@ void process_command(conn *c, char *command) { int try_read_command(conn *c) { char *el, *cont; + assert(c->rcurr <= c->rbuf + c->rsize); + if (!c->rbytes) return 0; el = memchr(c->rcurr, '\n', c->rbytes); @@ -1233,11 +1421,15 @@ int try_read_command(conn *c) { } *el = '\0'; + assert(cont <= c->rcurr + c->rbytes); + process_command(c, c->rcurr); c->rbytes -= (cont - c->rcurr); c->rcurr = cont; + assert(c->rcurr <= c->rbuf + c->rsize); + return 1; } @@ -1347,6 +1539,25 @@ int update_event(conn *c, int new_flags) { } /* + * Sets whether we are listening for new connections or not. + */ +void accept_new_conns(int do_accept) { + if (do_accept) { + update_event(listen_conn, EV_READ | EV_PERSIST); + if (listen(listen_conn->sfd, 1024)) { + perror("listen"); + } + } + else { + update_event(listen_conn, 0); + if (listen(listen_conn->sfd, 0)) { + perror("listen"); + } + } +} + + +/* * Transmit the next chunk of data from our list of msgbuf structures. * * Returns: @@ -1426,6 +1637,10 @@ void drive_machine(conn *c) { if (errno == EAGAIN || errno == EWOULDBLOCK) { stop = 1; break; + } else if (errno == EMFILE) { + if (settings.verbose > 0) + fprintf(stderr, "Too many open connections\n"); + accept_new_conns(0); } else { perror("accept()"); } @@ -1561,7 +1776,7 @@ void drive_machine(conn *c) { */ if (c->iovused == 0 || (c->udp && c->iovused == 1)) { if (add_iov(c, c->wcurr, c->wbytes) || - c->udp && build_udp_headers(c)) { + (c->udp && build_udp_headers(c))) { if (settings.verbose > 0) fprintf(stderr, "Couldn't build response\n"); conn_set_state(c, conn_closing); @@ -1665,7 +1880,7 @@ int new_socket(int is_udp) { */ void maximize_sndbuf(int sfd) { socklen_t intsize = sizeof(int); - int last_good; + int last_good = 0; int min, max, avg; int old_size; @@ -1867,7 +2082,6 @@ void delete_handler(int fd, short which, void *arg) { { int i, j=0; - rel_time_t now = current_time; for (i=0; i<delcurr; i++) { item *it = todelete[i]; if (item_delete_lock_over(it)) { @@ -2015,7 +2229,6 @@ void sig_handler(int sig) { int main (int argc, char **argv) { int c; - conn *l_conn; conn *u_conn; struct in_addr addr; int lock_memory = 0; @@ -2260,7 +2473,7 @@ int main (int argc, char **argv) { exit(1); } /* create the initial listening connection */ - if (!(l_conn = conn_new(l_socket, conn_listening, EV_READ | EV_PERSIST, 1, 0))) { + if (!(listen_conn = conn_new(l_socket, conn_listening, EV_READ | EV_PERSIST, 1, 0))) { fprintf(stderr, "failed to create listening connection"); exit(1); } diff --git a/memcached.h b/memcached.h index b7c2ccd..4fb2a53 100644 --- a/memcached.h +++ b/memcached.h @@ -85,9 +85,9 @@ typedef struct _stritem { #define ITEM_key(item) ((char*)&((item)->end[0])) /* warning: don't use these macros with a function, as it evals its arg twice */ -#define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey) -#define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + (item)->nsuffix) -#define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + (item)->nsuffix + (item)->nbytes) +#define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey + 1) +#define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + 1 + (item)->nsuffix) +#define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + 1 + (item)->nsuffix + (item)->nbytes) enum conn_states { conn_listening, /* the socket which listens for connections */ @@ -230,12 +230,14 @@ char* slabs_stats(int *buflen); 0 = fail -1 = tried. busy. send again shortly. */ int slabs_reassign(unsigned char srcid, unsigned char dstid); +int slabs_newslab(unsigned int id); /* event handling, network IO */ void event_handler(int fd, short which, void *arg); conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp); void conn_close(conn *c); void conn_init(void); +void accept_new_conns(int do_accept); void drive_machine(conn *c); int new_socket(int isUdp); int server_socket(int port, int isUdp); @@ -256,13 +258,16 @@ void stats_init(void); void settings_init(void); /* associative array */ void assoc_init(void); -item *assoc_find(char *key); -int assoc_insert(char *key, item *item); -void assoc_delete(char *key); +item *assoc_find(const char *key, size_t nkey); +int assoc_insert(item *item); +void assoc_delete(const char *key, size_t nkey); +void assoc_move_next_bucket(void); + void item_init(void); -item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes); +item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes); void item_free(item *it); -int item_size_ok(char *key, int flags, int nbytes); +int item_size_ok(char *key, size_t nkey, int flags, int nbytes); + int item_link(item *it); /* may fail if transgresses limits */ void item_unlink(item *it); void item_remove(item *it); |