diff options
-rw-r--r-- | assoc.c | 23 | ||||
-rw-r--r-- | daemon.c | 7 | ||||
-rw-r--r-- | items.c | 52 | ||||
-rw-r--r-- | memcached.c | 258 | ||||
-rw-r--r-- | memcached.h | 45 | ||||
-rw-r--r-- | slabs.c | 59 |
6 files changed, 444 insertions, 0 deletions
@@ -27,13 +27,18 @@ #include <errno.h> #include <event.h> #include <assert.h> + #include "memcached.h" + typedef unsigned long int ub4; /* unsigned 4-byte quantities */ typedef unsigned char ub1; /* unsigned 1-byte quantities */ + /* hard-code one million buckets, for now (2**20 == 4MB hash) */ #define HASHPOWER 20 + #define hashsize(n) ((ub4)1<<(n)) #define hashmask(n) (hashsize(n)-1) + #define mix(a,b,c) \ { \ a -= b; a -= c; a ^= (c>>13); \ @@ -46,6 +51,7 @@ typedef unsigned char ub1; /* unsigned 1-byte quantities */ b -= c; b -= a; b ^= (a<<10); \ c -= a; c -= b; c ^= (b>>15); \ } + /* -------------------------------------------------------------------- hash() -- hash a variable-length key into a 32-bit value @@ -55,30 +61,37 @@ hash() -- hash a variable-length key into a 32-bit value Returns a 32-bit value. Every bit of the key affects every bit of the return value. Every 1-bit and 2-bit delta achieves avalanche. About 6*len+35 instructions. + The best hash table sizes are powers of 2. There is no need to do mod a prime (mod is sooo slow!). If you need less than 32 bits, use a bitmask. For example, if you need only 10 bits, do h = (h & hashmask(10)); In which case, the hash table should have hashsize(10) elements. + If you are hashing n strings (ub1 **)k, do it like this: for (i=0, h=0; i<n; ++i) h = hash( k[i], len[i], h); + By Bob Jenkins, 1996. bob_jenkins@burtleburtle.net. You may use this code any way you wish, private, educational, or commercial. It's free. + See http://burtleburtle.net/bob/hash/evahash.html Use for hash table lookup, or anything where one collision in 2^^32 is acceptable. Do NOT use for cryptographic purposes. -------------------------------------------------------------------- */ + ub4 hash( k, length, initval) register ub1 *k; /* the key */ register ub4 length; /* the length of the key */ register ub4 initval; /* the previous hash, or an arbitrary value */ { register ub4 a,b,c,len; + /* Set up the internal state */ len = length; a = b = 0x9e3779b9; /* the golden ratio; an arbitrary value */ c = initval; /* the previous hash value */ + /*---------------------------------------- handle most of the key */ while (len >= 12) { @@ -88,6 +101,7 @@ ub4 hash( k, length, initval) mix(a,b,c); k += 12; len -= 12; } + /*------------------------------------- handle the last 11 bytes */ c += length; switch(len) /* all the case statements fall through */ @@ -110,7 +124,9 @@ ub4 hash( k, length, initval) /*-------------------------------------------- report the result */ return c; } + static item** hashtable = 0; + void assoc_init(void) { unsigned int hash_size = hashsize(HASHPOWER) * sizeof(void*); hashtable = malloc(hash_size); @@ -120,9 +136,11 @@ void assoc_init(void) { } memset(hashtable, 0, hash_size); } + item *assoc_find(char *key) { ub4 hv = hash(key, strlen(key), 0) & hashmask(HASHPOWER); item *it = hashtable[hv]; + while (it) { if (strcmp(key, ITEM_key(it)) == 0) return it; @@ -130,16 +148,20 @@ item *assoc_find(char *key) { } return 0; } + /* returns the address of the item pointer before the key. if *item == 0, the item wasn't found */ + static item** _hashitem_before (char *key) { ub4 hv = hash(key, strlen(key), 0) & hashmask(HASHPOWER); item **pos = &hashtable[hv]; + while (*pos && strcmp(key, ITEM_key(*pos))) { pos = &(*pos)->h_next; } return pos; } + /* Note: this isn't an assoc_update. The key must not already exist to call this */ int assoc_insert(char *key, item *it) { ub4 hv = hash(key, strlen(key), 0) & hashmask(HASHPOWER); @@ -147,6 +169,7 @@ int assoc_insert(char *key, item *it) { hashtable[hv] = it; return 1; } + void assoc_delete(char *key) { item **before = _hashitem_before(key); if (*before) { @@ -28,18 +28,22 @@ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ + #if defined __SUNPRO_C || defined __DECC || defined __HP_cc # pragma ident "@(#)$Header: /cvsroot/wikipedia/willow/src/bin/willow/daemon.c,v 1.1 2005/05/02 19:15:21 kateturner Exp $" # pragma ident "$NetBSD: daemon.c,v 1.9 2003/08/07 16:42:46 agc Exp $" #endif + #include <fcntl.h> #include <stdlib.h> #include <unistd.h> + int daemon(nochdir, noclose) int nochdir, noclose; { int fd; + switch (fork()) { case -1: return (-1); @@ -48,10 +52,13 @@ daemon(nochdir, noclose) default: _exit(0); } + if (setsid() == -1) return (-1); + if (!nochdir) (void)chdir("/"); + if (!noclose && (fd = open("/dev/null", O_RDWR, 0)) != -1) { (void)dup2(fd, STDIN_FILENO); (void)dup2(fd, STDOUT_FILENO); @@ -16,11 +16,15 @@ #include <time.h> #include <event.h> #include <assert.h> + #include "memcached.h" + + #define LARGEST_ID 255 static item *heads[LARGEST_ID]; static item *tails[LARGEST_ID]; unsigned int sizes[LARGEST_ID]; + void item_init(void) { int i; for(i=0; i<LARGEST_ID; i++) { @@ -29,6 +33,8 @@ void item_init(void) { sizes[i]=0; } } + + /* * Generates the variable-sized part of the header for an object. * @@ -45,31 +51,40 @@ int item_make_header(char *key, int flags, int nbytes, *nsuffix = sprintf(suffix, " %u %u\r\n", flags, nbytes - 2); return sizeof(item) + *keylen + *nsuffix + nbytes; } + item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes) { int nsuffix, ntotal, len; item *it; unsigned int id; char suffix[40]; + ntotal = item_make_header(key, flags, nbytes, suffix, &nsuffix, &len); + id = slabs_clsid(ntotal); if (id == 0) return 0; + it = slabs_alloc(ntotal); if (it == 0) { int tries = 50; item *search; + /* If requested to not push old items out of cache when memory runs out, * we're out of luck at this point... */ + if (!settings.evict_to_free) return 0; + /* * try to get one off the right LRU * don't necessariuly unlink the tail because it may be locked: refcount>0 * search up from tail an item with refcount==0 and unlink it; give up after 50 * tries */ + if (id > LARGEST_ID) return 0; if (tails[id]==0) return 0; + for (search = tails[id]; tries>0 && search; tries--, search=search->prev) { if (search->refcount==0) { item_unlink(search); @@ -79,9 +94,13 @@ item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes) { it = slabs_alloc(ntotal); if (it==0) return 0; } + assert(it->slabs_clsid == 0); + it->slabs_clsid = id; + assert(it != heads[it->slabs_clsid]); + it->next = it->prev = it->h_next = 0; it->refcount = 0; it->it_flags = 0; @@ -93,17 +112,20 @@ item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes) { it->nsuffix = nsuffix; return it; } + void item_free(item *it) { unsigned int ntotal = ITEM_ntotal(it); assert((it->it_flags & ITEM_LINKED) == 0); assert(it != heads[it->slabs_clsid]); assert(it != tails[it->slabs_clsid]); assert(it->refcount == 0); + /* so slab size changer can tell later if item is already free or not */ it->slabs_clsid = 0; it->it_flags |= ITEM_SLABBED; slabs_free(it, ntotal); } + /* * Returns true if an item will fit in the cache (its size does not exceed * the maximum for a cache entry.) @@ -111,13 +133,16 @@ void item_free(item *it) { int item_size_ok(char *key, int flags, int nbytes) { char prefix[40]; int keylen, nsuffix; + return slabs_clsid(item_make_header(key, flags, nbytes, prefix, &nsuffix, &keylen)) != 0; } + void item_link_q(item *it) { /* item is the new head */ item **head, **tail; assert(it->slabs_clsid <= LARGEST_ID); assert((it->it_flags & ITEM_SLABBED) == 0); + head = &heads[it->slabs_clsid]; tail = &tails[it->slabs_clsid]; assert(it != *head); @@ -130,11 +155,13 @@ void item_link_q(item *it) { /* item is the new head */ sizes[it->slabs_clsid]++; return; } + void item_unlink_q(item *it) { item **head, **tail; assert(it->slabs_clsid <= LARGEST_ID); head = &heads[it->slabs_clsid]; tail = &tails[it->slabs_clsid]; + if (*head == it) { assert(it->prev == 0); *head = it->next; @@ -145,23 +172,29 @@ void item_unlink_q(item *it) { } assert(it->next != it); assert(it->prev != it); + if (it->next) it->next->prev = it->prev; if (it->prev) it->prev->next = it->next; sizes[it->slabs_clsid]--; return; } + int item_link(item *it) { assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0); assert(it->nbytes < 1048576); it->it_flags |= ITEM_LINKED; it->time = current_time; assoc_insert(ITEM_key(it), it); + stats.curr_bytes += ITEM_ntotal(it); stats.curr_items += 1; stats.total_items += 1; + item_link_q(it); + return 1; } + void item_unlink(item *it) { if (it->it_flags & ITEM_LINKED) { it->it_flags &= ~ITEM_LINKED; @@ -172,6 +205,7 @@ void item_unlink(item *it) { } if (it->refcount == 0) item_free(it); } + void item_remove(item *it) { assert((it->it_flags & ITEM_SLABBED) == 0); if (it->refcount) it->refcount--; @@ -180,18 +214,24 @@ void item_remove(item *it) { item_free(it); } } + void item_update(item *it) { assert((it->it_flags & ITEM_SLABBED) == 0); + item_unlink_q(it); it->time = current_time; item_link_q(it); } + int item_replace(item *it, item *new_it) { assert((it->it_flags & ITEM_SLABBED) == 0); + item_unlink(it); return item_link(new_it); } + char *item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int *bytes) { + int memlimit = 2*1024*1024; char *buffer; int bufcurr; @@ -199,11 +239,14 @@ char *item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int int len; int shown = 0; char temp[512]; + if (slabs_clsid > LARGEST_ID) return 0; it = heads[slabs_clsid]; + buffer = malloc(memlimit); if (buffer == 0) return 0; bufcurr = 0; + while (it && (!limit || shown < limit)) { len = sprintf(temp, "ITEM %s [%u b; %lu s]\r\n", ITEM_key(it), it->nbytes - 2, it->time + stats.started); if (bufcurr + len + 6 > memlimit) /* 6 is END\r\n\0 */ @@ -213,19 +256,24 @@ char *item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int shown++; it = it->next; } + strcpy(buffer+bufcurr, "END\r\n"); bufcurr+=5; + *bytes = bufcurr; return buffer; } + void item_stats(char *buffer, int buflen) { int i; char *bufcurr = buffer; rel_time_t now = current_time; + if (buflen < 4096) { strcpy(buffer, "SERVER_ERROR out of memory"); return; } + for (i=0; i<LARGEST_ID; i++) { if (tails[i]) bufcurr += sprintf(bufcurr, "STAT items:%u:number %u\r\nSTAT items:%u:age %u\r\n", @@ -234,17 +282,20 @@ void item_stats(char *buffer, int buflen) { strcpy(bufcurr, "END"); return; } + /* dumps out a list of objects of each size, with granularity of 32 bytes */ char* item_stats_sizes(int *bytes) { int num_buckets = 32768; /* max 1MB object, divided into 32 bytes size buckets */ unsigned int *histogram = (unsigned int*) malloc(num_buckets * sizeof(int)); char *buf = (char*) malloc(1024*1024*2*sizeof(char)); int i; + if (histogram == 0 || buf == 0) { if (histogram) free(histogram); if (buf) free(buf); return 0; } + /* build the histogram */ memset(histogram, 0, num_buckets * sizeof(int)); for (i=0; i<LARGEST_ID; i++) { @@ -257,6 +308,7 @@ char* item_stats_sizes(int *bytes) { iter = iter->next; } } + /* write the buffer */ *bytes = 0; for (i=0; i<num_buckets; i++) { diff --git a/memcached.c b/memcached.c index 6a728fe..3601a42 100644 --- a/memcached.c +++ b/memcached.c @@ -47,30 +47,40 @@ #include <event.h> #include <assert.h> #include <limits.h> + #ifdef HAVE_MALLOC_H #include <malloc.h> #endif + #include "memcached.h" + struct stats stats; struct settings settings; + static item **todelete = 0; static int delcurr; static int deltotal; + #define TRANSMIT_COMPLETE 0 #define TRANSMIT_INCOMPLETE 1 #define TRANSMIT_SOFT_ERROR 2 #define TRANSMIT_HARD_ERROR 3 + int *buckets = 0; /* bucket->generation array for a managed instance */ + #define REALTIME_MAXDELTA 60*60*24*30 rel_time_t realtime(time_t exptime) { /* no. of seconds in 30 days - largest possible delta exptime */ + if (exptime == 0) return 0; /* 0 means never expire */ + if (exptime > REALTIME_MAXDELTA) return (rel_time_t) (exptime - stats.started); else { return (rel_time_t) (exptime + current_time); } } + void stats_init(void) { stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0; stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = 0; @@ -82,6 +92,7 @@ void stats_reset(void) { stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = 0; stats.bytes_read = stats.bytes_written = 0; } + void settings_init(void) { settings.port = 11211; settings.udpport = 11211; @@ -113,6 +124,7 @@ item *get_item(char *key) { } return it; } + /* * Adds a message header to a connection. * @@ -121,6 +133,7 @@ item *get_item(char *key) { int add_msghdr(conn *c) { struct msghdr *msg; + if (c->msgsize == c->msgused) { msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr)); if (! msg) @@ -128,6 +141,7 @@ int add_msghdr(conn *c) c->msglist = msg; c->msgsize *= 2; } + msg = c->msglist + c->msgused; msg->msg_iov = &c->iov[c->iovused]; msg->msg_iovlen = 0; @@ -138,24 +152,30 @@ int add_msghdr(conn *c) msg->msg_flags = 0; c->msgbytes = 0; c->msgused++; + if (c->udp) { /* Leave room for the UDP header, which we'll fill in later. */ return add_iov(c, NULL, UDP_HEADER_SIZE); } + return 0; } + conn **freeconns; int freetotal; int freecurr; + void conn_init(void) { freetotal = 200; freecurr = 0; freeconns = (conn **)malloc(sizeof (conn *)*freetotal); return; } + conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp) { conn *c; + /* do we have a free conn structure from a previous close? */ if (freecurr > 0) { c = freeconns[--freecurr]; @@ -169,17 +189,20 @@ conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, c->iov = 0; c->msglist = 0; c->hdrbuf = 0; + c->rsize = read_buffer_size; c->wsize = DATA_BUFFER_SIZE; c->isize = ITEM_LIST_INITIAL; c->iovsize = IOV_LIST_INITIAL; c->msgsize = MSG_LIST_INITIAL; c->hdrsize = 0; + c->rbuf = (char *) malloc(c->rsize); c->wbuf = (char *) malloc(c->wsize); c->ilist = (item **) malloc(sizeof(item *) * c->isize); c->iov = (struct iovec *) malloc(sizeof(struct iovec) * c->iovsize); c->msglist = (struct msghdr *) malloc(sizeof(struct msghdr) * c->msgsize); + if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 || c->msglist == 0) { if (c->rbuf != 0) free(c->rbuf); @@ -191,10 +214,13 @@ conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, perror("malloc()"); return 0; } + c->rsize = c->wsize = DATA_BUFFER_SIZE; c->isize = 200; /* TODO: another instance of '200'. must kill all these */ + stats.conn_structs++; } + if (settings.verbose > 1) { if (init_state == conn_listening) fprintf(stderr, "<%d server listening\n", sfd); @@ -203,6 +229,7 @@ conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, else fprintf(stderr, "<%d new client connection\n", sfd); } + c->sfd = sfd; c->udp = is_udp; c->state = init_state; @@ -216,13 +243,16 @@ conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, c->iovused = 0; c->msgcurr = 0; c->msgused = 0; + c->write_and_go = conn_read; c->write_and_free = 0; c->item = 0; c->bucket = -1; c->gen = 0; + event_set(&c->event, sfd, event_flags, event_handler, (void *)c); c->ev_flags = event_flags; + if (event_add(&c->event, 0) == -1) { if (freecurr < freetotal) { freeconns[freecurr++] = c; @@ -238,25 +268,31 @@ conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, } return 0; } + stats.curr_conns++; stats.total_conns++; + return c; } + void conn_cleanup(conn *c) { if (c->item) { item_free(c->item); c->item = 0; } + if (c->ileft) { for (; c->ileft > 0; c->ileft--,c->icurr++) { item_remove(*(c->icurr)); } } + if (c->write_and_free) { free(c->write_and_free); c->write_and_free = 0; } } + /* * Frees a connection. */ @@ -277,13 +313,17 @@ static void conn_free(conn *c) { free(c); } } + void conn_close(conn *c) { /* delete the event, the socket and the conn */ event_del(&c->event); + if (settings.verbose > 1) fprintf(stderr, "<%d connection closed.\n", c->sfd); + close(c->sfd); conn_cleanup(c); + /* if the connection has big buffers, just free it */ if (c->rsize > READ_BUFFER_HIGHWAT) { conn_free(c); @@ -301,9 +341,12 @@ void conn_close(conn *c) { conn_free(c); } } + stats.curr_conns--; + return; } + /* * Reallocates memory and updates a buffer size if successful. */ @@ -316,6 +359,7 @@ int do_realloc(void **orig, int newsize, int bytes_per_item, int *size) { } return 0; } + /* * Shrinks a connection's buffers if they're too big. This prevents * periodic large "get" requests from permanently chewing lots of server @@ -327,19 +371,24 @@ int do_realloc(void **orig, int newsize, int bytes_per_item, int *size) { void conn_shrink(conn *c) { if (c->udp) return; + if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) { do_realloc((void **)&c->rbuf, DATA_BUFFER_SIZE, 1, &c->rsize); } + if (c->isize > ITEM_LIST_HIGHWAT) { do_realloc((void **)&c->ilist, ITEM_LIST_INITIAL, sizeof(c->ilist[0]), &c->isize); } + if (c->msgsize > MSG_LIST_HIGHWAT) { do_realloc((void **)&c->msglist, MSG_LIST_INITIAL, sizeof(c->msglist[0]), &c->msgsize); } + if (c->iovsize > IOV_LIST_HIGHWAT) { do_realloc((void **)&c->iov, IOV_LIST_INITIAL, sizeof(c->iov[0]), &c->iovsize); } } + /* * Sets a connection's current state in the state machine. Any special * processing that needs to happen on certain state transitions can @@ -353,6 +402,8 @@ void conn_set_state(conn *c, int state) { c->state = state; } } + + /* * Ensures that there is room for another struct iovec in a connection's * iov list. @@ -368,40 +419,50 @@ int ensure_iov_space(conn *c) { return -1; c->iov = new_iov; c->iovsize *= 2; + /* Point all the msghdr structures at the new list. */ for (i = 0, iovnum = 0; i < c->msgused; i++) { c->msglist[i].msg_iov = &c->iov[iovnum]; iovnum += c->msglist[i].msg_iovlen; } } + return 0; } + + /* * Adds data to the list of pending data that will be written out to a * connection. * * Returns 0 on success, -1 on out-of-memory. */ + int add_iov(conn *c, const void *buf, int len) { struct msghdr *m; int i; int leftover; int limit_to_mtu; + do { m = &c->msglist[c->msgused - 1]; + /* * Limit UDP packets, and the first payloads of TCP replies, to * UDP_MAX_PAYLOAD_SIZE bytes. */ limit_to_mtu = c->udp || (1 == c->msgused); + /* We may need to start a new msghdr if this one is full. */ if (m->msg_iovlen == IOV_MAX || limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE) { add_msghdr(c); m = &c->msglist[c->msgused - 1]; } + if (ensure_iov_space(c)) return -1; + /* If the fragment is too big to fit in the datagram, split it up */ if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) { leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE; @@ -409,23 +470,30 @@ int add_iov(conn *c, const void *buf, int len) { } else { leftover = 0; } + m = &c->msglist[c->msgused - 1]; m->msg_iov[m->msg_iovlen].iov_base = buf; m->msg_iov[m->msg_iovlen].iov_len = len; + c->msgbytes += len; c->iovused++; m->msg_iovlen++; + buf = ((char *)buf) + len; len = leftover; } while (leftover > 0); + return 0; } + + /* * Constructs a set of UDP headers and attaches them to the outgoing messages. */ int build_udp_headers(conn *c) { int i; unsigned char *hdr; + if (c->msgused > c->hdrsize) { void *new_hdrbuf; if (c->hdrbuf) @@ -437,6 +505,7 @@ int build_udp_headers(conn *c) { c->hdrbuf = (unsigned char *) new_hdrbuf; c->hdrsize = c->msgused * 2; } + hdr = c->hdrbuf; for (i = 0; i < c->msgused; i++) { c->msglist[i].msg_iov[0].iov_base = hdr; @@ -451,83 +520,107 @@ int build_udp_headers(conn *c) { *hdr++ = 0; assert(hdr == c->msglist[i].iov[0].iov_base + UDP_HEADER_SIZE); } + return 0; } + + void out_string(conn *c, char *str) { int len; + if (settings.verbose > 1) fprintf(stderr, ">%d %s\n", c->sfd, str); + len = strlen(str); if (len + 2 > c->wsize) { /* ought to be always enough. just fail for simplicity */ str = "SERVER_ERROR output line too long"; len = strlen(str); } + strcpy(c->wbuf, str); strcpy(c->wbuf + len, "\r\n"); c->wbytes = len + 2; c->wcurr = c->wbuf; + conn_set_state(c, conn_write); c->write_and_go = conn_read; return; } + /* * we get here after reading the value in set/add/replace commands. The command * has been stored in c->item_comm, and the item is ready in c->item. */ + void complete_nread(conn *c) { item *it = c->item; int comm = c->item_comm; item *old_it; rel_time_t now = current_time; + stats.set_cmds++; + while(1) { if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) { out_string(c, "CLIENT_ERROR bad data chunk"); break; } + old_it = assoc_find(ITEM_key(it)); + if (old_it && settings.oldest_live && old_it->time <= settings.oldest_live) { item_unlink(old_it); old_it = 0; } + if (old_it && old_it->exptime && old_it->exptime < now) { item_unlink(old_it); old_it = 0; } + if (old_it && comm==NREAD_ADD) { item_update(old_it); out_string(c, "NOT_STORED"); break; } + if (!old_it && comm == NREAD_REPLACE) { out_string(c, "NOT_STORED"); break; } + if (old_it && (old_it->it_flags & ITEM_DELETED) && (comm == NREAD_REPLACE || comm == NREAD_ADD)) { out_string(c, "NOT_STORED"); break; } + if (old_it) { item_replace(old_it, it); } else item_link(it); + c->item = 0; out_string(c, "STORED"); return; } + item_free(it); c->item = 0; return; } + void process_stat(conn *c, char *command) { rel_time_t now = current_time; + if (strcmp(command, "stats") == 0) { char temp[1024]; pid_t pid = getpid(); char *pos = temp; struct rusage usage; + getrusage(RUSAGE_SELF, &usage); + pos += sprintf(pos, "STAT pid %u\r\n", pid); pos += sprintf(pos, "STAT uptime %u\r\n", now); pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started); @@ -551,17 +644,20 @@ void process_stat(conn *c, char *command) { out_string(c, temp); return; } + if (strcmp(command, "stats reset") == 0) { stats_reset(); out_string(c, "RESET"); return; } + #ifdef HAVE_MALLOC_H #ifdef HAVE_STRUCT_MALLINFO if (strcmp(command, "stats malloc") == 0) { char temp[512]; struct mallinfo info; char *pos = temp; + info = mallinfo(); pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena); pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks); @@ -578,22 +674,26 @@ void process_stat(conn *c, char *command) { } #endif /* HAVE_STRUCT_MALLINFO */ #endif /* HAVE_MALLOC_H */ + if (strcmp(command, "stats maps") == 0) { char *wbuf; int wsize = 8192; /* should be enough */ int fd; int res; + wbuf = (char *)malloc(wsize); if (wbuf == 0) { out_string(c, "SERVER_ERROR out of memory"); return; } + fd = open("/proc/self/maps", O_RDONLY); if (fd == -1) { out_string(c, "SERVER_ERROR cannot open the maps file"); free(wbuf); return; } + res = read(fd, wbuf, wsize - 6); /* 6 = END\r\n\0 */ if (res == wsize - 6) { out_string(c, "SERVER_ERROR buffer overflow"); @@ -614,6 +714,7 @@ void process_stat(conn *c, char *command) { close(fd); return; } + if (strncmp(command, "stats cachedump", 15) == 0) { char *buf; unsigned int bytes, id, limit = 0; @@ -622,11 +723,13 @@ void process_stat(conn *c, char *command) { out_string(c, "CLIENT_ERROR bad command line"); return; } + buf = item_cachedump(id, limit, &bytes); if (buf == 0) { out_string(c, "SERVER_ERROR out of memory"); return; } + c->write_and_free = buf; c->wcurr = buf; c->wbytes = bytes; @@ -634,6 +737,7 @@ void process_stat(conn *c, char *command) { c->write_and_go = conn_read; return; } + if (strcmp(command, "stats slabs")==0) { int bytes = 0; char *buf = slabs_stats(&bytes); @@ -648,12 +752,14 @@ void process_stat(conn *c, char *command) { c->write_and_go = conn_read; return; } + if (strcmp(command, "stats items")==0) { char buffer[4096]; item_stats(buffer, 4096); out_string(c, buffer); return; } + if (strcmp(command, "stats sizes")==0) { int bytes = 0; char *buf = item_stats_sizes(&bytes); @@ -661,6 +767,7 @@ void process_stat(conn *c, char *command) { out_string(c, "SERVER_ERROR out of memory"); return; } + c->write_and_free = buf; c->wcurr = buf; c->wbytes = bytes; @@ -668,17 +775,23 @@ void process_stat(conn *c, char *command) { c->write_and_go = conn_read; return; } + out_string(c, "ERROR"); } + void process_command(conn *c, char *command) { + int comm = 0; int incr = 0; + /* * for commands set/add/replace, we build an item and read the data * directly into it, then continue in nread_complete(). */ + if (settings.verbose > 1) fprintf(stderr, "<%d %s\n", c->sfd, command); + c->msgcurr = 0; c->msgused = 0; c->iovused = 0; @@ -686,19 +799,23 @@ void process_command(conn *c, char *command) { out_string(c, "SERVER_ERROR out of memory"); return; } + if ((strncmp(command, "add ", 4) == 0 && (comm = NREAD_ADD)) || (strncmp(command, "set ", 4) == 0 && (comm = NREAD_SET)) || (strncmp(command, "replace ", 8) == 0 && (comm = NREAD_REPLACE))) { + char key[251]; int flags; time_t expire; int len, res; item *it; + res = sscanf(command, "%*s %250s %u %ld %d\n", key, &flags, &expire, &len); if (res!=4 || strlen(key)==0 ) { out_string(c, "CLIENT_ERROR bad command line format"); return; } + if (settings.managed) { int bucket = c->bucket; if (bucket == -1) { @@ -711,8 +828,10 @@ void process_command(conn *c, char *command) { return; } } + expire = realtime(expire); it = item_alloc(key, flags, expire, len+2); + if (it == 0) { if (! item_size_ok(key, flags, len + 2)) out_string(c, "SERVER_ERROR object too large for cache"); @@ -723,6 +842,7 @@ void process_command(conn *c, char *command) { c->sbytes = len+2; return; } + c->item_comm = comm; c->item = it; c->ritem = ITEM_data(it); @@ -730,6 +850,7 @@ void process_command(conn *c, char *command) { conn_set_state(c, conn_nread); return; } + if ((strncmp(command, "incr ", 5) == 0 && (incr = 1)) || (strncmp(command, "decr ", 5) == 0)) { char temp[32]; @@ -789,11 +910,13 @@ void process_command(conn *c, char *command) { out_string(c, temp); return; } + if (strncmp(command, "bget ", 5) == 0) { c->binary = 1; goto get; } if (strncmp(command, "get ", 4) == 0) { + char *start = command + 4; char key[251]; int next; @@ -803,6 +926,7 @@ void process_command(conn *c, char *command) { get: now = current_time; i = 0; + if (settings.managed) { int bucket = c->bucket; if (bucket == -1) { @@ -815,6 +939,7 @@ void process_command(conn *c, char *command) { return; } } + while(sscanf(start, " %250s%n", key, &next) >= 1) { start+=next; stats.get_cmds++; @@ -827,6 +952,7 @@ void process_command(conn *c, char *command) { c->ilist = new_list; } else break; } + /* * Construct the response. Each hit adds three elements to the * outgoing data list: @@ -843,6 +969,7 @@ void process_command(conn *c, char *command) { } if (settings.verbose > 1) fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it)); + stats.get_hits++; it->refcount++; item_update(it); @@ -850,11 +977,14 @@ void process_command(conn *c, char *command) { i++; } else stats.get_misses++; } + c->icurr = c->ilist; c->ileft = i; + if (settings.verbose > 1) fprintf(stderr, ">%d END\n", c->sfd); add_iov(c, "END\r\n", 5); + if (c->udp && build_udp_headers(c)) { out_string(c, "SERVER_ERROR out of memory"); } @@ -864,11 +994,13 @@ void process_command(conn *c, char *command) { } return; } + if (strncmp(command, "delete ", 7) == 0) { char key[251]; item *it; int res; time_t exptime = 0; + if (settings.managed) { int bucket = c->bucket; if (bucket == -1) { @@ -906,6 +1038,7 @@ void process_command(conn *c, char *command) { return; } } + it->refcount++; /* use its expiration time as its deletion time now */ it->exptime = realtime(exptime); @@ -914,6 +1047,7 @@ void process_command(conn *c, char *command) { out_string(c, "DELETED"); return; } + if (strncmp(command, "own ", 4) == 0) { int bucket, gen; char *start = command+4; @@ -934,6 +1068,7 @@ void process_command(conn *c, char *command) { return; } } + if (strncmp(command, "disown ", 7) == 0) { int bucket; char *start = command+7; @@ -954,6 +1089,7 @@ void process_command(conn *c, char *command) { return; } } + if (strncmp(command, "bg ", 3) == 0) { int bucket, gen; char *start = command+3; @@ -976,35 +1112,43 @@ void process_command(conn *c, char *command) { return; } } + if (strncmp(command, "stats", 5) == 0) { process_stat(c, command); return; } + if (strncmp(command, "flush_all", 9) == 0) { time_t exptime = 0; int res; + if (strcmp(command, "flush_all") == 0) { settings.oldest_live = current_time; out_string(c, "OK"); return; } + res = sscanf(command, "%*s %ld", &exptime); if (res != 1) { out_string(c, "ERROR"); return; } + settings.oldest_live = realtime(exptime); out_string(c, "OK"); return; } + if (strcmp(command, "version") == 0) { out_string(c, "VERSION " VERSION); return; } + if (strcmp(command, "quit") == 0) { conn_set_state(c, conn_closing); return; } + if (strncmp(command, "slabs reassign ", 15) == 0) { #ifdef ALLOW_SLABS_REASSIGN int src, dst; @@ -1030,14 +1174,17 @@ void process_command(conn *c, char *command) { #endif return; } + out_string(c, "ERROR"); return; } + /* * if we have a complete line in the buffer, process it. */ int try_read_command(conn *c) { char *el, *cont; + if (!c->rbytes) return 0; el = memchr(c->rcurr, '\n', c->rbytes); @@ -1048,38 +1195,48 @@ int try_read_command(conn *c) { el--; } *el = '\0'; + process_command(c, c->rcurr); + c->rbytes -= (cont - c->rcurr); c->rcurr = cont; + return 1; } + /* * read a UDP request. * return 0 if there's nothing to read. */ int try_read_udp(conn *c) { int res; + c->request_addr_size = sizeof(c->request_addr); res = recvfrom(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes, 0, &c->request_addr, &c->request_addr_size); if (res > 8) { unsigned char *buf = (unsigned char *)c->rbuf + c->rbytes; stats.bytes_read += res; + /* Beginning of UDP packet is the request ID; save it. */ c->request_id = buf[0] * 256 + buf[1]; + /* If this is a multi-packet request, drop it. */ if (buf[4] != 0 || buf[5] != 1) { out_string(c, "SERVER_ERROR multi-packet request not supported"); return 0; } + /* Don't care about any of the rest of the header. */ res -= 8; memmove(c->rbuf, c->rbuf + 8, res); + c->rbytes += res; return 1; } return 0; } + /* * read from network as much as we can, handle buffer overflow and connection * close. @@ -1090,11 +1247,13 @@ int try_read_udp(conn *c) { int try_read_network(conn *c) { int gotdata = 0; int res; + if (c->rcurr != c->rbuf) { if (c->rbytes != 0) /* otherwise there's nothing to copy */ memmove(c->rbuf, c->rcurr, c->rbytes); c->rcurr = c->rbuf; } + while (1) { if (c->rbytes >= c->rsize) { char *new_rbuf = realloc(c->rbuf, c->rsize*2); @@ -1109,6 +1268,7 @@ int try_read_network(conn *c) { c->rcurr = c->rbuf = new_rbuf; c->rsize *= 2; } + /* unix socket mode doesn't need this, so zeroed out. but why * is this done for every command? presumably for UDP * mode. */ @@ -1117,6 +1277,7 @@ int try_read_network(conn *c) { } else { c->request_addr_size = 0; } + res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes); if (res > 0) { stats.bytes_read += res; @@ -1136,6 +1297,7 @@ int try_read_network(conn *c) { } return gotdata; } + int update_event(conn *c, int new_flags) { if (c->ev_flags == new_flags) return 1; @@ -1145,6 +1307,7 @@ int update_event(conn *c, int new_flags) { if (event_add(&c->event, 0) == -1) return 0; return 1; } + /* * Transmit the next chunk of data from our list of msgbuf structures. * @@ -1156,6 +1319,7 @@ int update_event(conn *c, int new_flags) { */ int transmit(conn *c) { int res; + if (c->msgcurr < c->msgused && c->msglist[c->msgcurr].msg_iovlen == 0) { /* Finished writing the current msg; advance to the next. */ @@ -1166,6 +1330,7 @@ int transmit(conn *c) { res = sendmsg(c->sfd, m, 0); if (res > 0) { stats.bytes_written += res; + /* We've written some of the data. Remove the completed iovec entries from the list of pending writes. */ while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) { @@ -1173,6 +1338,7 @@ int transmit(conn *c) { m->msg_iovlen--; m->msg_iov++; } + /* Might have written just part of the last iovec entry; adjust it so the next write will do the rest. */ if (res > 0) { @@ -1194,6 +1360,7 @@ int transmit(conn *c) { we have a real error, on which we close the connection */ if (settings.verbose > 0) perror("Failed to write, and not due to blocking"); + if (c->udp) conn_set_state(c, conn_read); else @@ -1203,13 +1370,16 @@ int transmit(conn *c) { return TRANSMIT_COMPLETE; } } + void drive_machine(conn *c) { + int exit = 0; int sfd, flags = 1; socklen_t addrlen; struct sockaddr addr; conn *newc; int res; + while (!exit) { switch(c->state) { case conn_listening: @@ -1237,7 +1407,9 @@ void drive_machine(conn *c) { close(sfd); break; } + break; + case conn_read: if (try_read_command(c)) { continue; @@ -1254,6 +1426,7 @@ void drive_machine(conn *c) { } exit = 1; break; + case conn_nread: /* we are reading rlbytes into ritem; */ if (c->rlbytes == 0) { @@ -1270,6 +1443,7 @@ void drive_machine(conn *c) { c->rbytes -= tocopy; break; } + /* now try reading from the socket */ res = read(c->sfd, c->ritem, c->rlbytes); if (res > 0) { @@ -1297,12 +1471,14 @@ void drive_machine(conn *c) { fprintf(stderr, "Failed to read, and not due to blocking\n"); conn_set_state(c, conn_closing); break; + case conn_swallow: /* we are reading sbytes and throwing them away */ if (c->sbytes == 0) { conn_set_state(c, conn_read); break; } + /* first check if we have leftovers in the conn_read buffer */ if (c->rbytes > 0) { int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes; @@ -1311,6 +1487,7 @@ void drive_machine(conn *c) { c->rbytes -= tocopy; break; } + /* now try reading from the socket */ res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize); if (res > 0) { @@ -1337,6 +1514,7 @@ void drive_machine(conn *c) { fprintf(stderr, "Failed to read, and not due to blocking\n"); conn_set_state(c, conn_closing); break; + case conn_write: /* * We want to write out a simple response. If we haven't already, @@ -1352,7 +1530,9 @@ void drive_machine(conn *c) { break; } } + /* fall through... */ + case conn_mwrite: switch (transmit(c)) { case TRANSMIT_COMPLETE: @@ -1377,14 +1557,17 @@ void drive_machine(conn *c) { conn_set_state(c, conn_closing); } break; + case TRANSMIT_INCOMPLETE: case TRANSMIT_HARD_ERROR: break; /* Continue in state machine. */ + case TRANSMIT_SOFT_ERROR: exit = 1; break; } break; + case conn_closing: if (c->udp) conn_cleanup(c); @@ -1393,13 +1576,18 @@ void drive_machine(conn *c) { exit = 1; break; } + } + return; } + void event_handler(int fd, short which, void *arg) { conn *c; + c = (conn *)arg; c->which = which; + /* sanity */ if (fd != c->sfd) { if (settings.verbose > 0) @@ -1407,18 +1595,23 @@ void event_handler(int fd, short which, void *arg) { conn_close(c); return; } + /* do as much I/O as possible until we block */ drive_machine(c); + /* wait for next event */ return; } + int new_socket(int is_udp) { int sfd; int flags; + if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) { perror("socket()"); return -1; } + if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 || fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { perror("setting O_NONBLOCK"); @@ -1427,6 +1620,8 @@ int new_socket(int is_udp) { } return sfd; } + + /* * Sets a socket's send buffer size to the maximum allowed by the system. */ @@ -1435,15 +1630,18 @@ void maximize_sndbuf(int sfd) { int last_good; int min, max, avg; int old_size; + /* Start with the default size. */ if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize)) { if (settings.verbose > 0) perror("getsockopt(SO_SNDBUF)"); return; } + /* Binary-search for the real maximum. */ min = old_size; max = MAX_SENDBUF_SIZE; + while (min <= max) { avg = ((unsigned int) min + max) / 2; if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &avg, intsize) == 0) { @@ -1453,17 +1651,22 @@ void maximize_sndbuf(int sfd) { max = avg - 1; } } + if (settings.verbose > 1) fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good); } + + int server_socket(int port, int is_udp) { int sfd; struct linger ling = {0, 0}; struct sockaddr_in addr; int flags =1; + if ((sfd = new_socket(is_udp)) == -1) { return -1; } + setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags)); if (is_udp) { maximize_sndbuf(sfd); @@ -1472,11 +1675,13 @@ int server_socket(int port, int is_udp) { setsockopt(sfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)); setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags)); } + /* * the memset call clears nonstandard fields in some impementations * that otherwise mess things up. */ memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr = settings.interface; @@ -1492,13 +1697,16 @@ int server_socket(int port, int is_udp) { } return sfd; } + int new_socket_unix(void) { int sfd; int flags; + if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { perror("socket()"); return -1; } + if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 || fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { perror("setting O_NONBLOCK"); @@ -1507,18 +1715,22 @@ int new_socket_unix(void) { } return sfd; } + int server_socket_unix(char *path) { int sfd; struct linger ling = {0, 0}; struct sockaddr_un addr; struct stat tstat; int flags =1; + if (!path) { return -1; } + if ((sfd = new_socket_unix()) == -1) { return -1; } + /* * Clean up a previous socket file if we left it around */ @@ -1526,14 +1738,17 @@ int server_socket_unix(char *path) { if (S_ISSOCK(tstat.st_mode)) unlink(path); } + setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags)); setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags)); setsockopt(sfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)); + /* * the memset call clears nonstandard fields in some impementations * that otherwise mess things up. */ memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; strcpy(addr.sun_path, path); if (bind(sfd, (struct sockaddr *) &addr, sizeof(addr)) == -1) { @@ -1548,6 +1763,8 @@ int server_socket_unix(char *path) { } return sfd; } + + /* invoke right before gdb is called, on assert */ void pre_gdb () { int i = 0; @@ -1556,6 +1773,7 @@ void pre_gdb () { for (i=3; i<=500; i++) close(i); /* so lame */ kill(getpid(), SIGABRT); } + /* * We keep the current time of day in a global variable that's updated by a * timer event. This saves us a bunch of time() system calls (we really only @@ -1566,25 +1784,32 @@ void pre_gdb () { */ volatile rel_time_t current_time; struct event clockevent; + void clock_handler(int fd, short which, void *arg) { struct timeval t; static int initialized = 0; + if (initialized) { /* only delete the event if it's actually there. */ evtimer_del(&clockevent); } else { initialized = 1; } + evtimer_set(&clockevent, clock_handler, 0); t.tv_sec = 1; t.tv_usec = 0; evtimer_add(&clockevent, &t); + current_time = (rel_time_t) (time(0) - stats.started); } + struct event deleteevent; + void delete_handler(int fd, short which, void *arg) { struct timeval t; static int initialized = 0; + if (initialized) { /* some versions of libevent don't like deleting events that don't exist, so only delete once we know this event has been added. */ @@ -1592,9 +1817,11 @@ void delete_handler(int fd, short which, void *arg) { } else { initialized = 1; } + evtimer_set(&deleteevent, delete_handler, 0); t.tv_sec = 5; t.tv_usec=0; evtimer_add(&deleteevent, &t); + { int i, j=0; rel_time_t now = current_time; @@ -1612,6 +1839,7 @@ void delete_handler(int fd, short which, void *arg) { delcurr = j; } } + void usage(void) { printf(PACKAGE " " VERSION "\n"); printf("-p <num> port number to listen on\n"); @@ -1634,6 +1862,7 @@ void usage(void) { printf("-n <bytes> minimum space allocated for key+value+flags, default 48\n"); return; } + void usage_license(void) { printf(PACKAGE " " VERSION "\n\n"); printf( @@ -1701,35 +1930,45 @@ void usage_license(void) { "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n" "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n" ); + return; } + void save_pid(pid_t pid,char *pid_file) { FILE *fp; if (!pid_file) return; + if (!(fp = fopen(pid_file,"w"))) { fprintf(stderr,"Could not open the pid file %s for writing\n",pid_file); return; } + fprintf(fp,"%ld\n",(long) pid); if (fclose(fp) == -1) { fprintf(stderr,"Could not close the pid file %s.\n",pid_file); return; } } + void remove_pidfile(char *pid_file) { if (!pid_file) return; + if (unlink(pid_file)) { fprintf(stderr,"Could not remove the pid file %s.\n",pid_file); } + } + int l_socket=0; int u_socket=-1; + void sig_handler(int sig) { printf("SIGINT handled.\n"); exit(0); } + int main (int argc, char **argv) { int c; conn *l_conn; @@ -1743,12 +1982,16 @@ int main (int argc, char **argv) { struct sigaction sa; struct rlimit rlim; char *pid_file = NULL; + /* handle SIGINT */ signal(SIGINT, sig_handler); + /* init settings */ settings_init(); + /* set stderr non-buffering (for running under, say, daemontools) */ setbuf(stderr, NULL); + /* process arguments */ while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:")) != -1) { switch (c) { @@ -1824,6 +2067,7 @@ int main (int argc, char **argv) { return 1; } } + if (maxcore) { struct rlimit rlim_new; /* @@ -1844,15 +2088,18 @@ int main (int argc, char **argv) { * the soft limit ends up 0, because then no core files will be * created at all. */ + if ((getrlimit(RLIMIT_CORE, &rlim)!=0) || rlim.rlim_cur==0) { fprintf(stderr, "failed to ensure corefile creation\n"); exit(1); } } + /* * If needed, increase rlimits to allow as many connections * as needed. */ + if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) { fprintf(stderr, "failed to getrlimit number of files\n"); exit(1); @@ -1867,12 +2114,14 @@ int main (int argc, char **argv) { exit(1); } } + /* * initialization order: first create the listening sockets * (may need root on low ports), then drop root if needed, * then daemonise if needed, then init libevent (in some cases * descriptors created by libevent wouldn't survive forking). */ + /* create the listening socket and bind it */ if (!settings.socketpath) { l_socket = server_socket(settings.port, 0); @@ -1881,6 +2130,7 @@ int main (int argc, char **argv) { exit(1); } } + if (settings.udpport > 0 && ! settings.socketpath) { /* create the UDP listening socket and bind it */ u_socket = server_socket(settings.udpport, 1); @@ -1889,6 +2139,7 @@ int main (int argc, char **argv) { exit(1); } } + /* lose root privileges if we have them */ if (getuid()== 0 || geteuid()==0) { if (username==0 || *username=='\0') { @@ -1904,6 +2155,7 @@ int main (int argc, char **argv) { return 1; } } + /* create unix mode sockets after dropping privileges */ if (settings.socketpath) { l_socket = server_socket_unix(settings.socketpath); @@ -1912,6 +2164,7 @@ int main (int argc, char **argv) { exit(1); } } + /* daemonize if requested */ /* if we want to ensure our ability to dump core, don't chdir to / */ if (daemonize) { @@ -1922,6 +2175,8 @@ int main (int argc, char **argv) { return 1; } } + + /* initialize other stuff */ item_init(); event_init(); @@ -1929,6 +2184,7 @@ int main (int argc, char **argv) { assoc_init(); conn_init(); slabs_init(settings.maxbytes, settings.factor); + /* managed instance? alloc and zero a bucket array */ if (settings.managed) { buckets = malloc(sizeof(int)*MAX_BUCKETS); @@ -1938,6 +2194,7 @@ int main (int argc, char **argv) { } memset(buckets, 0, sizeof(int)*MAX_BUCKETS); } + /* lock paged memory if needed */ if (lock_memory) { #ifdef HAVE_MLOCKALL @@ -1946,6 +2203,7 @@ int main (int argc, char **argv) { fprintf(stderr, "warning: mlockall() not supported on this platform. proceeding without.\n"); #endif } + /* * ignore SIGPIPE signals; we can use errno==EPIPE if we * need that information diff --git a/memcached.h b/memcached.h index 2f57a72..621f0ec 100644 --- a/memcached.h +++ b/memcached.h @@ -5,19 +5,25 @@ #define UDP_MAX_PAYLOAD_SIZE 1400 #define UDP_HEADER_SIZE 8 #define MAX_SENDBUF_SIZE (256 * 1024 * 1024) + /* Initial size of list of items being returned by "get". */ #define ITEM_LIST_INITIAL 200 + /* Initial size of the sendmsg() scatter/gather array. */ #define IOV_LIST_INITIAL 400 + /* Initial number of sendmsg() argument structures to allocate. */ #define MSG_LIST_INITIAL 10 + /* High water marks for buffer shrinking */ #define READ_BUFFER_HIGHWAT 8192 #define ITEM_LIST_HIGHWAT 400 #define IOV_LIST_HIGHWAT 600 #define MSG_LIST_HIGHWAT 100 + /* Time relative to server start. Smaller than time_t on 64-bit systems. */ typedef unsigned int rel_time_t; + struct stats { unsigned int curr_items; unsigned int total_items; @@ -33,6 +39,7 @@ struct stats { unsigned long long bytes_read; unsigned long long bytes_written; }; + struct settings { size_t maxbytes; int maxconns; @@ -47,12 +54,16 @@ struct settings { double factor; /* chunk size growth factor */ int chunk_size; }; + extern struct stats stats; extern struct settings settings; + #define ITEM_LINKED 1 #define ITEM_DELETED 2 + /* temp */ #define ITEM_SLABBED 4 + typedef struct _stritem { struct _stritem *next; struct _stritem *prev; @@ -70,11 +81,14 @@ typedef struct _stritem { /* then " flags length\r\n" (no terminating null) */ /* then data with terminating \r\n (no terminating null; it's binary!) */ } item; + #define ITEM_key(item) ((char*)&((item)->end[0])) + /* warning: don't use these macros with a function, as it evals its arg twice */ #define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey) #define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + (item)->nsuffix) #define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + (item)->nsuffix + (item)->nbytes) + enum conn_states { conn_listening, /* the socket which listens for connections */ conn_read, /* reading in a command line */ @@ -84,50 +98,63 @@ enum conn_states { conn_closing, /* closing this connection */ conn_mwrite /* writing out many items sequentially */ }; + #define NREAD_ADD 1 #define NREAD_SET 2 #define NREAD_REPLACE 3 + typedef struct { int sfd; int state; struct event event; short ev_flags; short which; /* which events were just triggered */ + char *rbuf; /* buffer to read commands into */ char *rcurr; /* but if we parsed some already, this is where we stopped */ int rsize; /* total allocated size of rbuf */ int rbytes; /* how much data, starting from rcur, do we have unparsed */ + char *wbuf; char *wcurr; int wsize; int wbytes; int write_and_go; /* which state to go into after finishing current write */ void *write_and_free; /* free this memory after finishing writing */ + char *ritem; /* when we read in an item's value, it goes here */ int rlbytes; + /* data for the nread state */ + /* * item is used to hold an item structure created after reading the command * line of set/add/replace commands, but before we finished reading the actual * data. The data is read into ITEM_data(item) to avoid extra copying. */ + void *item; /* for commands set/add/replace */ int item_comm; /* which one is it: set/add/replace */ + /* data for the swallow state */ int sbytes; /* how many bytes to swallow */ + /* data for the mwrite state */ struct iovec *iov; int iovsize; /* number of elements allocated in iov[] */ int iovused; /* number of elements used in iov[] */ + struct msghdr *msglist; int msgsize; /* number of elements allocated in msglist[] */ int msgused; /* number of elements used in msglist[] */ int msgcurr; /* element in msglist[] being transmitted now */ int msgbytes; /* number of bytes in current msg */ + item **ilist; /* list of items to write out */ int isize; item **icurr; int ileft; + /* data for UDP clients */ int udp; /* 1 if this is a UDP "connection" */ int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */ @@ -135,36 +162,48 @@ typedef struct { socklen_t request_addr_size; unsigned char *hdrbuf; /* udp packet headers */ int hdrsize; /* number of headers' worth of space is allocated */ + int binary; /* are we in binary mode */ int bucket; /* bucket number for the next command, if running as a managed instance. -1 (_not_ 0) means invalid. */ int gen; /* generation requested for the bucket */ } conn; + /* number of virtual buckets for a managed instance */ #define MAX_BUCKETS 32768 + /* listening socket */ extern int l_socket; + /* udp socket */ extern int u_socket; + /* current time of day (updated periodically) */ extern volatile rel_time_t current_time; + /* temporary hack */ /* #define assert(x) if(!(x)) { printf("assert failure: %s\n", #x); pre_gdb(); } void pre_gdb (); */ + /* * Functions */ + /* * given time value that's either unix time or delta from current unix time, return * unix time. Use the fact that delta can't exceed one month (and real time value can't * be that low). */ + rel_time_t realtime(time_t exptime); + /* slabs memory allocation */ + /* Init the subsystem. 1st argument is the limit on no. of bytes to allocate, 0 if no limit. 2nd argument is the growth factor; each slab will use a chunk size equal to the previous slab's chunk size times this factor. */ void slabs_init(size_t limit, double factor); + /* Preallocate as many slab pages as possible (called from slabs_init) on start-up, so users don't get confused out-of-memory errors when they do have free (in-slab) space, but no space to make new slabs. @@ -172,20 +211,26 @@ void slabs_init(size_t limit, double factor); slab types can be made. if max memory is less than 18 MB, only the smaller ones will be made. */ void slabs_preallocate (unsigned int maxslabs); + /* Given object size, return id to use when allocating/freeing memory for object */ /* 0 means error: can't store such a large object */ unsigned int slabs_clsid(size_t size); + /* Allocate object of given length. 0 on error */ void *slabs_alloc(size_t size); + /* Free previously allocated object */ void slabs_free(void *ptr, size_t size); + /* Fill buffer with stats */ char* slabs_stats(int *buflen); + /* Request some slab be moved between classes 1 = success 0 = fail -1 = tried. busy. send again shortly. */ int slabs_reassign(unsigned char srcid, unsigned char dstid); + /* event handling, network IO */ void event_handler(int fd, short which, void *arg); conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp); @@ -24,35 +24,47 @@ #include <errno.h> #include <event.h> #include <assert.h> + #include "memcached.h" + #define POWER_SMALLEST 1 #define POWER_LARGEST 200 #define POWER_BLOCK 1048576 #define CHUNK_ALIGN_BYTES (sizeof(void *)) + /* powers-of-N allocation structures */ + typedef struct { unsigned int size; /* sizes of items */ unsigned int perslab; /* how many items per slab */ + void **slots; /* list of item ptrs */ unsigned int sl_total; /* size of previous array */ unsigned int sl_curr; /* first free slot */ + void *end_page_ptr; /* pointer to next free item at end of page, or 0 */ unsigned int end_page_free; /* number of items remaining at end of last alloced page */ + unsigned int slabs; /* how many slabs were allocated for this class */ + void **slab_list; /* array of slab pointers */ unsigned int list_size; /* size of prev array */ + unsigned int killing; /* index+1 of dying slab, or zero if none */ } slabclass_t; + static slabclass_t slabclass[POWER_LARGEST+1]; static size_t mem_limit = 0; static size_t mem_malloced = 0; static int power_largest; + /* * Figures out which slab class (chunk size) is required to store an item of * a given size. */ unsigned int slabs_clsid(size_t size) { int res = POWER_SMALLEST; + if(size==0) return 0; while (size > slabclass[res].size) @@ -60,6 +72,7 @@ unsigned int slabs_clsid(size_t size) { return 0; return res; } + /* * Determines the chunk sizes and initializes the slab class descriptors * accordingly. @@ -67,15 +80,19 @@ unsigned int slabs_clsid(size_t size) { void slabs_init(size_t limit, double factor) { int i = POWER_SMALLEST - 1; unsigned int size = sizeof(item) + settings.chunk_size; + /* Factor of 2.0 means use the default memcached behavior */ if (factor == 2.0 && size < 128) size = 128; + mem_limit = limit; memset(slabclass, 0, sizeof(slabclass)); + while (++i < POWER_LARGEST && size <= POWER_BLOCK / 2) { /* Make sure items are always n-byte aligned */ if (size % CHUNK_ALIGN_BYTES) size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES); + slabclass[i].size = size; slabclass[i].perslab = POWER_BLOCK / slabclass[i].size; size *= factor; @@ -84,27 +101,34 @@ void slabs_init(size_t limit, double factor) { i, slabclass[i].size, slabclass[i].perslab); } } + power_largest = i; slabclass[power_largest].size = POWER_BLOCK; slabclass[power_largest].perslab = 1; + #ifndef DONT_PREALLOC_SLABS slabs_preallocate(limit / POWER_BLOCK); #endif } + void slabs_preallocate (unsigned int maxslabs) { int i; unsigned int prealloc = 0; + /* pre-allocate a 1MB slab in every size class so people don't get confused by non-intuitive "SERVER_ERROR out of memory" messages. this is the most common question on the mailing list. if you really don't want this, you can rebuild without these three lines. */ + for(i=POWER_SMALLEST; i<=POWER_LARGEST; i++) { if (++prealloc > maxslabs) return; slabs_newslab(i); } + } + static int grow_slab_list (unsigned int id) { slabclass_t *p = &slabclass[id]; if (p->slabs == p->list_size) { @@ -116,6 +140,7 @@ static int grow_slab_list (unsigned int id) { } return 1; } + int slabs_newslab(unsigned int id) { slabclass_t *p = &slabclass[id]; #ifdef ALLOW_SLABS_REASSIGN @@ -124,38 +149,50 @@ int slabs_newslab(unsigned int id) { int len = p->size * p->perslab; #endif char *ptr; + if (mem_limit && mem_malloced + len > mem_limit && p->slabs > 0) return 0; + if (! grow_slab_list(id)) return 0; + ptr = malloc(len); if (ptr == 0) return 0; + memset(ptr, 0, len); p->end_page_ptr = ptr; p->end_page_free = p->perslab; + p->slab_list[p->slabs++] = ptr; mem_malloced += len; return 1; } + void *slabs_alloc(size_t size) { slabclass_t *p; + unsigned char id = slabs_clsid(size); if (id < POWER_SMALLEST || id > power_largest) return 0; + p = &slabclass[id]; assert(p->sl_curr == 0 || ((item*)p->slots[p->sl_curr-1])->slabs_clsid == 0); + #ifdef USE_SYSTEM_MALLOC if (mem_limit && mem_malloced + size > mem_limit) return 0; mem_malloced += size; return malloc(size); #endif + /* fail unless we have space at the end of a recently allocated page, we have something on our freelist, or we could allocate a new page */ if (! (p->end_page_ptr || p->sl_curr || slabs_newslab(id))) return 0; + /* return off our freelist, if we have one */ if (p->sl_curr) return p->slots[--p->sl_curr]; + /* if we recently allocated a whole page, return from that */ if (p->end_page_ptr) { void *ptr = p->end_page_ptr; @@ -166,21 +203,27 @@ void *slabs_alloc(size_t size) { } return ptr; } + return 0; /* shouldn't ever get here */ } + void slabs_free(void *ptr, size_t size) { unsigned char id = slabs_clsid(size); slabclass_t *p; + assert(((item *)ptr)->slabs_clsid==0); assert(id >= POWER_SMALLEST && id <= power_largest); if (id < POWER_SMALLEST || id > power_largest) return; + p = &slabclass[id]; + #ifdef USE_SYSTEM_MALLOC mem_malloced -= size; free(ptr); return; #endif + if (p->sl_curr == p->sl_total) { /* need more space on the free list */ int new_size = p->sl_total ? p->sl_total*2 : 16; /* 16 is arbitrary */ void **new_slots = realloc(p->slots, new_size*sizeof(void *)); @@ -192,19 +235,24 @@ void slabs_free(void *ptr, size_t size) { p->slots[p->sl_curr++] = ptr; return; } + char* slabs_stats(int *buflen) { int i, total; char *buf = (char*) malloc(power_largest * 200 + 100); char *bufcurr = buf; + *buflen = 0; if (!buf) return 0; + total = 0; for(i = POWER_SMALLEST; i <= power_largest; i++) { slabclass_t *p = &slabclass[i]; if (p->slabs) { unsigned int perslab, slabs; + slabs = p->slabs; perslab = p->perslab; + bufcurr += sprintf(bufcurr, "STAT %d:chunk_size %u\r\n", i, p->size); bufcurr += sprintf(bufcurr, "STAT %d:chunks_per_page %u\r\n", i, perslab); bufcurr += sprintf(bufcurr, "STAT %d:total_pages %u\r\n", i, slabs); @@ -220,6 +268,7 @@ char* slabs_stats(int *buflen) { *buflen = bufcurr - buf; return buf; } + #ifdef ALLOW_SLABS_REASSIGN /* Blows away all the items in a slab class and moves its slabs to another class. This is only used by the "slabs reassign" command, for manual tweaking @@ -234,20 +283,27 @@ int slabs_reassign(unsigned char srcid, unsigned char dstid) { slabclass_t *p, *dp; void *iter; int was_busy = 0; + if (srcid < POWER_SMALLEST || srcid > power_largest || dstid < POWER_SMALLEST || dstid > power_largest) return 0; + p = &slabclass[srcid]; dp = &slabclass[dstid]; + /* fail if src still populating, or no slab to give up in src */ if (p->end_page_ptr || ! p->slabs) return 0; + /* fail if dst is still growing or we can't make room to hold its new one */ if (dp->end_page_ptr || ! grow_slab_list(dstid)) return 0; + if (p->killing == 0) p->killing = 1; + slab = p->slab_list[p->killing-1]; slab_end = slab + POWER_BLOCK; + for (iter=slab; iter<slab_end; iter+=p->size) { item *it = (item *) iter; if (it->slabs_clsid) { @@ -255,6 +311,7 @@ int slabs_reassign(unsigned char srcid, unsigned char dstid) { item_unlink(it); } } + /* go through free list and discard items that are no longer part of this slab */ { int fi; @@ -265,7 +322,9 @@ int slabs_reassign(unsigned char srcid, unsigned char dstid) { } } } + if (was_busy) return -1; + /* if good, now move it to the dst slab class */ p->slab_list[p->killing-1] = p->slab_list[p->slabs-1]; p->slabs--; |