summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--assoc.c23
-rw-r--r--daemon.c7
-rw-r--r--items.c52
-rw-r--r--memcached.c258
-rw-r--r--memcached.h45
-rw-r--r--slabs.c59
6 files changed, 444 insertions, 0 deletions
diff --git a/assoc.c b/assoc.c
index 4476d2c..c3ca035 100644
--- a/assoc.c
+++ b/assoc.c
@@ -27,13 +27,18 @@
#include <errno.h>
#include <event.h>
#include <assert.h>
+
#include "memcached.h"
+
typedef unsigned long int ub4; /* unsigned 4-byte quantities */
typedef unsigned char ub1; /* unsigned 1-byte quantities */
+
/* hard-code one million buckets, for now (2**20 == 4MB hash) */
#define HASHPOWER 20
+
#define hashsize(n) ((ub4)1<<(n))
#define hashmask(n) (hashsize(n)-1)
+
#define mix(a,b,c) \
{ \
a -= b; a -= c; a ^= (c>>13); \
@@ -46,6 +51,7 @@ typedef unsigned char ub1; /* unsigned 1-byte quantities */
b -= c; b -= a; b ^= (a<<10); \
c -= a; c -= b; c ^= (b>>15); \
}
+
/*
--------------------------------------------------------------------
hash() -- hash a variable-length key into a 32-bit value
@@ -55,30 +61,37 @@ hash() -- hash a variable-length key into a 32-bit value
Returns a 32-bit value. Every bit of the key affects every bit of
the return value. Every 1-bit and 2-bit delta achieves avalanche.
About 6*len+35 instructions.
+
The best hash table sizes are powers of 2. There is no need to do
mod a prime (mod is sooo slow!). If you need less than 32 bits,
use a bitmask. For example, if you need only 10 bits, do
h = (h & hashmask(10));
In which case, the hash table should have hashsize(10) elements.
+
If you are hashing n strings (ub1 **)k, do it like this:
for (i=0, h=0; i<n; ++i) h = hash( k[i], len[i], h);
+
By Bob Jenkins, 1996. bob_jenkins@burtleburtle.net. You may use this
code any way you wish, private, educational, or commercial. It's free.
+
See http://burtleburtle.net/bob/hash/evahash.html
Use for hash table lookup, or anything where one collision in 2^^32 is
acceptable. Do NOT use for cryptographic purposes.
--------------------------------------------------------------------
*/
+
ub4 hash( k, length, initval)
register ub1 *k; /* the key */
register ub4 length; /* the length of the key */
register ub4 initval; /* the previous hash, or an arbitrary value */
{
register ub4 a,b,c,len;
+
/* Set up the internal state */
len = length;
a = b = 0x9e3779b9; /* the golden ratio; an arbitrary value */
c = initval; /* the previous hash value */
+
/*---------------------------------------- handle most of the key */
while (len >= 12)
{
@@ -88,6 +101,7 @@ ub4 hash( k, length, initval)
mix(a,b,c);
k += 12; len -= 12;
}
+
/*------------------------------------- handle the last 11 bytes */
c += length;
switch(len) /* all the case statements fall through */
@@ -110,7 +124,9 @@ ub4 hash( k, length, initval)
/*-------------------------------------------- report the result */
return c;
}
+
static item** hashtable = 0;
+
void assoc_init(void) {
unsigned int hash_size = hashsize(HASHPOWER) * sizeof(void*);
hashtable = malloc(hash_size);
@@ -120,9 +136,11 @@ void assoc_init(void) {
}
memset(hashtable, 0, hash_size);
}
+
item *assoc_find(char *key) {
ub4 hv = hash(key, strlen(key), 0) & hashmask(HASHPOWER);
item *it = hashtable[hv];
+
while (it) {
if (strcmp(key, ITEM_key(it)) == 0)
return it;
@@ -130,16 +148,20 @@ item *assoc_find(char *key) {
}
return 0;
}
+
/* returns the address of the item pointer before the key. if *item == 0,
the item wasn't found */
+
static item** _hashitem_before (char *key) {
ub4 hv = hash(key, strlen(key), 0) & hashmask(HASHPOWER);
item **pos = &hashtable[hv];
+
while (*pos && strcmp(key, ITEM_key(*pos))) {
pos = &(*pos)->h_next;
}
return pos;
}
+
/* Note: this isn't an assoc_update. The key must not already exist to call this */
int assoc_insert(char *key, item *it) {
ub4 hv = hash(key, strlen(key), 0) & hashmask(HASHPOWER);
@@ -147,6 +169,7 @@ int assoc_insert(char *key, item *it) {
hashtable[hv] = it;
return 1;
}
+
void assoc_delete(char *key) {
item **before = _hashitem_before(key);
if (*before) {
diff --git a/daemon.c b/daemon.c
index a2502e0..b80c53e 100644
--- a/daemon.c
+++ b/daemon.c
@@ -28,18 +28,22 @@
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
+
#if defined __SUNPRO_C || defined __DECC || defined __HP_cc
# pragma ident "@(#)$Header: /cvsroot/wikipedia/willow/src/bin/willow/daemon.c,v 1.1 2005/05/02 19:15:21 kateturner Exp $"
# pragma ident "$NetBSD: daemon.c,v 1.9 2003/08/07 16:42:46 agc Exp $"
#endif
+
#include <fcntl.h>
#include <stdlib.h>
#include <unistd.h>
+
int
daemon(nochdir, noclose)
int nochdir, noclose;
{
int fd;
+
switch (fork()) {
case -1:
return (-1);
@@ -48,10 +52,13 @@ daemon(nochdir, noclose)
default:
_exit(0);
}
+
if (setsid() == -1)
return (-1);
+
if (!nochdir)
(void)chdir("/");
+
if (!noclose && (fd = open("/dev/null", O_RDWR, 0)) != -1) {
(void)dup2(fd, STDIN_FILENO);
(void)dup2(fd, STDOUT_FILENO);
diff --git a/items.c b/items.c
index 481764e..6d7df76 100644
--- a/items.c
+++ b/items.c
@@ -16,11 +16,15 @@
#include <time.h>
#include <event.h>
#include <assert.h>
+
#include "memcached.h"
+
+
#define LARGEST_ID 255
static item *heads[LARGEST_ID];
static item *tails[LARGEST_ID];
unsigned int sizes[LARGEST_ID];
+
void item_init(void) {
int i;
for(i=0; i<LARGEST_ID; i++) {
@@ -29,6 +33,8 @@ void item_init(void) {
sizes[i]=0;
}
}
+
+
/*
* Generates the variable-sized part of the header for an object.
*
@@ -45,31 +51,40 @@ int item_make_header(char *key, int flags, int nbytes,
*nsuffix = sprintf(suffix, " %u %u\r\n", flags, nbytes - 2);
return sizeof(item) + *keylen + *nsuffix + nbytes;
}
+
item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes) {
int nsuffix, ntotal, len;
item *it;
unsigned int id;
char suffix[40];
+
ntotal = item_make_header(key, flags, nbytes, suffix, &nsuffix, &len);
+
id = slabs_clsid(ntotal);
if (id == 0)
return 0;
+
it = slabs_alloc(ntotal);
if (it == 0) {
int tries = 50;
item *search;
+
/* If requested to not push old items out of cache when memory runs out,
* we're out of luck at this point...
*/
+
if (!settings.evict_to_free) return 0;
+
/*
* try to get one off the right LRU
* don't necessariuly unlink the tail because it may be locked: refcount>0
* search up from tail an item with refcount==0 and unlink it; give up after 50
* tries
*/
+
if (id > LARGEST_ID) return 0;
if (tails[id]==0) return 0;
+
for (search = tails[id]; tries>0 && search; tries--, search=search->prev) {
if (search->refcount==0) {
item_unlink(search);
@@ -79,9 +94,13 @@ item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes) {
it = slabs_alloc(ntotal);
if (it==0) return 0;
}
+
assert(it->slabs_clsid == 0);
+
it->slabs_clsid = id;
+
assert(it != heads[it->slabs_clsid]);
+
it->next = it->prev = it->h_next = 0;
it->refcount = 0;
it->it_flags = 0;
@@ -93,17 +112,20 @@ item *item_alloc(char *key, int flags, rel_time_t exptime, int nbytes) {
it->nsuffix = nsuffix;
return it;
}
+
void item_free(item *it) {
unsigned int ntotal = ITEM_ntotal(it);
assert((it->it_flags & ITEM_LINKED) == 0);
assert(it != heads[it->slabs_clsid]);
assert(it != tails[it->slabs_clsid]);
assert(it->refcount == 0);
+
/* so slab size changer can tell later if item is already free or not */
it->slabs_clsid = 0;
it->it_flags |= ITEM_SLABBED;
slabs_free(it, ntotal);
}
+
/*
* Returns true if an item will fit in the cache (its size does not exceed
* the maximum for a cache entry.)
@@ -111,13 +133,16 @@ void item_free(item *it) {
int item_size_ok(char *key, int flags, int nbytes) {
char prefix[40];
int keylen, nsuffix;
+
return slabs_clsid(item_make_header(key, flags, nbytes,
prefix, &nsuffix, &keylen)) != 0;
}
+
void item_link_q(item *it) { /* item is the new head */
item **head, **tail;
assert(it->slabs_clsid <= LARGEST_ID);
assert((it->it_flags & ITEM_SLABBED) == 0);
+
head = &heads[it->slabs_clsid];
tail = &tails[it->slabs_clsid];
assert(it != *head);
@@ -130,11 +155,13 @@ void item_link_q(item *it) { /* item is the new head */
sizes[it->slabs_clsid]++;
return;
}
+
void item_unlink_q(item *it) {
item **head, **tail;
assert(it->slabs_clsid <= LARGEST_ID);
head = &heads[it->slabs_clsid];
tail = &tails[it->slabs_clsid];
+
if (*head == it) {
assert(it->prev == 0);
*head = it->next;
@@ -145,23 +172,29 @@ void item_unlink_q(item *it) {
}
assert(it->next != it);
assert(it->prev != it);
+
if (it->next) it->next->prev = it->prev;
if (it->prev) it->prev->next = it->next;
sizes[it->slabs_clsid]--;
return;
}
+
int item_link(item *it) {
assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
assert(it->nbytes < 1048576);
it->it_flags |= ITEM_LINKED;
it->time = current_time;
assoc_insert(ITEM_key(it), it);
+
stats.curr_bytes += ITEM_ntotal(it);
stats.curr_items += 1;
stats.total_items += 1;
+
item_link_q(it);
+
return 1;
}
+
void item_unlink(item *it) {
if (it->it_flags & ITEM_LINKED) {
it->it_flags &= ~ITEM_LINKED;
@@ -172,6 +205,7 @@ void item_unlink(item *it) {
}
if (it->refcount == 0) item_free(it);
}
+
void item_remove(item *it) {
assert((it->it_flags & ITEM_SLABBED) == 0);
if (it->refcount) it->refcount--;
@@ -180,18 +214,24 @@ void item_remove(item *it) {
item_free(it);
}
}
+
void item_update(item *it) {
assert((it->it_flags & ITEM_SLABBED) == 0);
+
item_unlink_q(it);
it->time = current_time;
item_link_q(it);
}
+
int item_replace(item *it, item *new_it) {
assert((it->it_flags & ITEM_SLABBED) == 0);
+
item_unlink(it);
return item_link(new_it);
}
+
char *item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int *bytes) {
+
int memlimit = 2*1024*1024;
char *buffer;
int bufcurr;
@@ -199,11 +239,14 @@ char *item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int
int len;
int shown = 0;
char temp[512];
+
if (slabs_clsid > LARGEST_ID) return 0;
it = heads[slabs_clsid];
+
buffer = malloc(memlimit);
if (buffer == 0) return 0;
bufcurr = 0;
+
while (it && (!limit || shown < limit)) {
len = sprintf(temp, "ITEM %s [%u b; %lu s]\r\n", ITEM_key(it), it->nbytes - 2, it->time + stats.started);
if (bufcurr + len + 6 > memlimit) /* 6 is END\r\n\0 */
@@ -213,19 +256,24 @@ char *item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int
shown++;
it = it->next;
}
+
strcpy(buffer+bufcurr, "END\r\n");
bufcurr+=5;
+
*bytes = bufcurr;
return buffer;
}
+
void item_stats(char *buffer, int buflen) {
int i;
char *bufcurr = buffer;
rel_time_t now = current_time;
+
if (buflen < 4096) {
strcpy(buffer, "SERVER_ERROR out of memory");
return;
}
+
for (i=0; i<LARGEST_ID; i++) {
if (tails[i])
bufcurr += sprintf(bufcurr, "STAT items:%u:number %u\r\nSTAT items:%u:age %u\r\n",
@@ -234,17 +282,20 @@ void item_stats(char *buffer, int buflen) {
strcpy(bufcurr, "END");
return;
}
+
/* dumps out a list of objects of each size, with granularity of 32 bytes */
char* item_stats_sizes(int *bytes) {
int num_buckets = 32768; /* max 1MB object, divided into 32 bytes size buckets */
unsigned int *histogram = (unsigned int*) malloc(num_buckets * sizeof(int));
char *buf = (char*) malloc(1024*1024*2*sizeof(char));
int i;
+
if (histogram == 0 || buf == 0) {
if (histogram) free(histogram);
if (buf) free(buf);
return 0;
}
+
/* build the histogram */
memset(histogram, 0, num_buckets * sizeof(int));
for (i=0; i<LARGEST_ID; i++) {
@@ -257,6 +308,7 @@ char* item_stats_sizes(int *bytes) {
iter = iter->next;
}
}
+
/* write the buffer */
*bytes = 0;
for (i=0; i<num_buckets; i++) {
diff --git a/memcached.c b/memcached.c
index 6a728fe..3601a42 100644
--- a/memcached.c
+++ b/memcached.c
@@ -47,30 +47,40 @@
#include <event.h>
#include <assert.h>
#include <limits.h>
+
#ifdef HAVE_MALLOC_H
#include <malloc.h>
#endif
+
#include "memcached.h"
+
struct stats stats;
struct settings settings;
+
static item **todelete = 0;
static int delcurr;
static int deltotal;
+
#define TRANSMIT_COMPLETE 0
#define TRANSMIT_INCOMPLETE 1
#define TRANSMIT_SOFT_ERROR 2
#define TRANSMIT_HARD_ERROR 3
+
int *buckets = 0; /* bucket->generation array for a managed instance */
+
#define REALTIME_MAXDELTA 60*60*24*30
rel_time_t realtime(time_t exptime) {
/* no. of seconds in 30 days - largest possible delta exptime */
+
if (exptime == 0) return 0; /* 0 means never expire */
+
if (exptime > REALTIME_MAXDELTA)
return (rel_time_t) (exptime - stats.started);
else {
return (rel_time_t) (exptime + current_time);
}
}
+
void stats_init(void) {
stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = 0;
@@ -82,6 +92,7 @@ void stats_reset(void) {
stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = 0;
stats.bytes_read = stats.bytes_written = 0;
}
+
void settings_init(void) {
settings.port = 11211;
settings.udpport = 11211;
@@ -113,6 +124,7 @@ item *get_item(char *key) {
}
return it;
}
+
/*
* Adds a message header to a connection.
*
@@ -121,6 +133,7 @@ item *get_item(char *key) {
int add_msghdr(conn *c)
{
struct msghdr *msg;
+
if (c->msgsize == c->msgused) {
msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
if (! msg)
@@ -128,6 +141,7 @@ int add_msghdr(conn *c)
c->msglist = msg;
c->msgsize *= 2;
}
+
msg = c->msglist + c->msgused;
msg->msg_iov = &c->iov[c->iovused];
msg->msg_iovlen = 0;
@@ -138,24 +152,30 @@ int add_msghdr(conn *c)
msg->msg_flags = 0;
c->msgbytes = 0;
c->msgused++;
+
if (c->udp) {
/* Leave room for the UDP header, which we'll fill in later. */
return add_iov(c, NULL, UDP_HEADER_SIZE);
}
+
return 0;
}
+
conn **freeconns;
int freetotal;
int freecurr;
+
void conn_init(void) {
freetotal = 200;
freecurr = 0;
freeconns = (conn **)malloc(sizeof (conn *)*freetotal);
return;
}
+
conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size,
int is_udp) {
conn *c;
+
/* do we have a free conn structure from a previous close? */
if (freecurr > 0) {
c = freeconns[--freecurr];
@@ -169,17 +189,20 @@ conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size,
c->iov = 0;
c->msglist = 0;
c->hdrbuf = 0;
+
c->rsize = read_buffer_size;
c->wsize = DATA_BUFFER_SIZE;
c->isize = ITEM_LIST_INITIAL;
c->iovsize = IOV_LIST_INITIAL;
c->msgsize = MSG_LIST_INITIAL;
c->hdrsize = 0;
+
c->rbuf = (char *) malloc(c->rsize);
c->wbuf = (char *) malloc(c->wsize);
c->ilist = (item **) malloc(sizeof(item *) * c->isize);
c->iov = (struct iovec *) malloc(sizeof(struct iovec) * c->iovsize);
c->msglist = (struct msghdr *) malloc(sizeof(struct msghdr) * c->msgsize);
+
if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
c->msglist == 0) {
if (c->rbuf != 0) free(c->rbuf);
@@ -191,10 +214,13 @@ conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size,
perror("malloc()");
return 0;
}
+
c->rsize = c->wsize = DATA_BUFFER_SIZE;
c->isize = 200; /* TODO: another instance of '200'. must kill all these */
+
stats.conn_structs++;
}
+
if (settings.verbose > 1) {
if (init_state == conn_listening)
fprintf(stderr, "<%d server listening\n", sfd);
@@ -203,6 +229,7 @@ conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size,
else
fprintf(stderr, "<%d new client connection\n", sfd);
}
+
c->sfd = sfd;
c->udp = is_udp;
c->state = init_state;
@@ -216,13 +243,16 @@ conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size,
c->iovused = 0;
c->msgcurr = 0;
c->msgused = 0;
+
c->write_and_go = conn_read;
c->write_and_free = 0;
c->item = 0;
c->bucket = -1;
c->gen = 0;
+
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
c->ev_flags = event_flags;
+
if (event_add(&c->event, 0) == -1) {
if (freecurr < freetotal) {
freeconns[freecurr++] = c;
@@ -238,25 +268,31 @@ conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size,
}
return 0;
}
+
stats.curr_conns++;
stats.total_conns++;
+
return c;
}
+
void conn_cleanup(conn *c) {
if (c->item) {
item_free(c->item);
c->item = 0;
}
+
if (c->ileft) {
for (; c->ileft > 0; c->ileft--,c->icurr++) {
item_remove(*(c->icurr));
}
}
+
if (c->write_and_free) {
free(c->write_and_free);
c->write_and_free = 0;
}
}
+
/*
* Frees a connection.
*/
@@ -277,13 +313,17 @@ static void conn_free(conn *c) {
free(c);
}
}
+
void conn_close(conn *c) {
/* delete the event, the socket and the conn */
event_del(&c->event);
+
if (settings.verbose > 1)
fprintf(stderr, "<%d connection closed.\n", c->sfd);
+
close(c->sfd);
conn_cleanup(c);
+
/* if the connection has big buffers, just free it */
if (c->rsize > READ_BUFFER_HIGHWAT) {
conn_free(c);
@@ -301,9 +341,12 @@ void conn_close(conn *c) {
conn_free(c);
}
}
+
stats.curr_conns--;
+
return;
}
+
/*
* Reallocates memory and updates a buffer size if successful.
*/
@@ -316,6 +359,7 @@ int do_realloc(void **orig, int newsize, int bytes_per_item, int *size) {
}
return 0;
}
+
/*
* Shrinks a connection's buffers if they're too big. This prevents
* periodic large "get" requests from permanently chewing lots of server
@@ -327,19 +371,24 @@ int do_realloc(void **orig, int newsize, int bytes_per_item, int *size) {
void conn_shrink(conn *c) {
if (c->udp)
return;
+
if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
do_realloc((void **)&c->rbuf, DATA_BUFFER_SIZE, 1, &c->rsize);
}
+
if (c->isize > ITEM_LIST_HIGHWAT) {
do_realloc((void **)&c->ilist, ITEM_LIST_INITIAL, sizeof(c->ilist[0]), &c->isize);
}
+
if (c->msgsize > MSG_LIST_HIGHWAT) {
do_realloc((void **)&c->msglist, MSG_LIST_INITIAL, sizeof(c->msglist[0]), &c->msgsize);
}
+
if (c->iovsize > IOV_LIST_HIGHWAT) {
do_realloc((void **)&c->iov, IOV_LIST_INITIAL, sizeof(c->iov[0]), &c->iovsize);
}
}
+
/*
* Sets a connection's current state in the state machine. Any special
* processing that needs to happen on certain state transitions can
@@ -353,6 +402,8 @@ void conn_set_state(conn *c, int state) {
c->state = state;
}
}
+
+
/*
* Ensures that there is room for another struct iovec in a connection's
* iov list.
@@ -368,40 +419,50 @@ int ensure_iov_space(conn *c) {
return -1;
c->iov = new_iov;
c->iovsize *= 2;
+
/* Point all the msghdr structures at the new list. */
for (i = 0, iovnum = 0; i < c->msgused; i++) {
c->msglist[i].msg_iov = &c->iov[iovnum];
iovnum += c->msglist[i].msg_iovlen;
}
}
+
return 0;
}
+
+
/*
* Adds data to the list of pending data that will be written out to a
* connection.
*
* Returns 0 on success, -1 on out-of-memory.
*/
+
int add_iov(conn *c, const void *buf, int len) {
struct msghdr *m;
int i;
int leftover;
int limit_to_mtu;
+
do {
m = &c->msglist[c->msgused - 1];
+
/*
* Limit UDP packets, and the first payloads of TCP replies, to
* UDP_MAX_PAYLOAD_SIZE bytes.
*/
limit_to_mtu = c->udp || (1 == c->msgused);
+
/* We may need to start a new msghdr if this one is full. */
if (m->msg_iovlen == IOV_MAX ||
limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE) {
add_msghdr(c);
m = &c->msglist[c->msgused - 1];
}
+
if (ensure_iov_space(c))
return -1;
+
/* If the fragment is too big to fit in the datagram, split it up */
if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
@@ -409,23 +470,30 @@ int add_iov(conn *c, const void *buf, int len) {
} else {
leftover = 0;
}
+
m = &c->msglist[c->msgused - 1];
m->msg_iov[m->msg_iovlen].iov_base = buf;
m->msg_iov[m->msg_iovlen].iov_len = len;
+
c->msgbytes += len;
c->iovused++;
m->msg_iovlen++;
+
buf = ((char *)buf) + len;
len = leftover;
} while (leftover > 0);
+
return 0;
}
+
+
/*
* Constructs a set of UDP headers and attaches them to the outgoing messages.
*/
int build_udp_headers(conn *c) {
int i;
unsigned char *hdr;
+
if (c->msgused > c->hdrsize) {
void *new_hdrbuf;
if (c->hdrbuf)
@@ -437,6 +505,7 @@ int build_udp_headers(conn *c) {
c->hdrbuf = (unsigned char *) new_hdrbuf;
c->hdrsize = c->msgused * 2;
}
+
hdr = c->hdrbuf;
for (i = 0; i < c->msgused; i++) {
c->msglist[i].msg_iov[0].iov_base = hdr;
@@ -451,83 +520,107 @@ int build_udp_headers(conn *c) {
*hdr++ = 0;
assert(hdr == c->msglist[i].iov[0].iov_base + UDP_HEADER_SIZE);
}
+
return 0;
}
+
+
void out_string(conn *c, char *str) {
int len;
+
if (settings.verbose > 1)
fprintf(stderr, ">%d %s\n", c->sfd, str);
+
len = strlen(str);
if (len + 2 > c->wsize) {
/* ought to be always enough. just fail for simplicity */
str = "SERVER_ERROR output line too long";
len = strlen(str);
}
+
strcpy(c->wbuf, str);
strcpy(c->wbuf + len, "\r\n");
c->wbytes = len + 2;
c->wcurr = c->wbuf;
+
conn_set_state(c, conn_write);
c->write_and_go = conn_read;
return;
}
+
/*
* we get here after reading the value in set/add/replace commands. The command
* has been stored in c->item_comm, and the item is ready in c->item.
*/
+
void complete_nread(conn *c) {
item *it = c->item;
int comm = c->item_comm;
item *old_it;
rel_time_t now = current_time;
+
stats.set_cmds++;
+
while(1) {
if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
out_string(c, "CLIENT_ERROR bad data chunk");
break;
}
+
old_it = assoc_find(ITEM_key(it));
+
if (old_it && settings.oldest_live &&
old_it->time <= settings.oldest_live) {
item_unlink(old_it);
old_it = 0;
}
+
if (old_it && old_it->exptime && old_it->exptime < now) {
item_unlink(old_it);
old_it = 0;
}
+
if (old_it && comm==NREAD_ADD) {
item_update(old_it);
out_string(c, "NOT_STORED");
break;
}
+
if (!old_it && comm == NREAD_REPLACE) {
out_string(c, "NOT_STORED");
break;
}
+
if (old_it && (old_it->it_flags & ITEM_DELETED) && (comm == NREAD_REPLACE || comm == NREAD_ADD)) {
out_string(c, "NOT_STORED");
break;
}
+
if (old_it) {
item_replace(old_it, it);
} else item_link(it);
+
c->item = 0;
out_string(c, "STORED");
return;
}
+
item_free(it);
c->item = 0;
return;
}
+
void process_stat(conn *c, char *command) {
rel_time_t now = current_time;
+
if (strcmp(command, "stats") == 0) {
char temp[1024];
pid_t pid = getpid();
char *pos = temp;
struct rusage usage;
+
getrusage(RUSAGE_SELF, &usage);
+
pos += sprintf(pos, "STAT pid %u\r\n", pid);
pos += sprintf(pos, "STAT uptime %u\r\n", now);
pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
@@ -551,17 +644,20 @@ void process_stat(conn *c, char *command) {
out_string(c, temp);
return;
}
+
if (strcmp(command, "stats reset") == 0) {
stats_reset();
out_string(c, "RESET");
return;
}
+
#ifdef HAVE_MALLOC_H
#ifdef HAVE_STRUCT_MALLINFO
if (strcmp(command, "stats malloc") == 0) {
char temp[512];
struct mallinfo info;
char *pos = temp;
+
info = mallinfo();
pos += sprintf(pos, "STAT arena_size %d\r\n", info.arena);
pos += sprintf(pos, "STAT free_chunks %d\r\n", info.ordblks);
@@ -578,22 +674,26 @@ void process_stat(conn *c, char *command) {
}
#endif /* HAVE_STRUCT_MALLINFO */
#endif /* HAVE_MALLOC_H */
+
if (strcmp(command, "stats maps") == 0) {
char *wbuf;
int wsize = 8192; /* should be enough */
int fd;
int res;
+
wbuf = (char *)malloc(wsize);
if (wbuf == 0) {
out_string(c, "SERVER_ERROR out of memory");
return;
}
+
fd = open("/proc/self/maps", O_RDONLY);
if (fd == -1) {
out_string(c, "SERVER_ERROR cannot open the maps file");
free(wbuf);
return;
}
+
res = read(fd, wbuf, wsize - 6); /* 6 = END\r\n\0 */
if (res == wsize - 6) {
out_string(c, "SERVER_ERROR buffer overflow");
@@ -614,6 +714,7 @@ void process_stat(conn *c, char *command) {
close(fd);
return;
}
+
if (strncmp(command, "stats cachedump", 15) == 0) {
char *buf;
unsigned int bytes, id, limit = 0;
@@ -622,11 +723,13 @@ void process_stat(conn *c, char *command) {
out_string(c, "CLIENT_ERROR bad command line");
return;
}
+
buf = item_cachedump(id, limit, &bytes);
if (buf == 0) {
out_string(c, "SERVER_ERROR out of memory");
return;
}
+
c->write_and_free = buf;
c->wcurr = buf;
c->wbytes = bytes;
@@ -634,6 +737,7 @@ void process_stat(conn *c, char *command) {
c->write_and_go = conn_read;
return;
}
+
if (strcmp(command, "stats slabs")==0) {
int bytes = 0;
char *buf = slabs_stats(&bytes);
@@ -648,12 +752,14 @@ void process_stat(conn *c, char *command) {
c->write_and_go = conn_read;
return;
}
+
if (strcmp(command, "stats items")==0) {
char buffer[4096];
item_stats(buffer, 4096);
out_string(c, buffer);
return;
}
+
if (strcmp(command, "stats sizes")==0) {
int bytes = 0;
char *buf = item_stats_sizes(&bytes);
@@ -661,6 +767,7 @@ void process_stat(conn *c, char *command) {
out_string(c, "SERVER_ERROR out of memory");
return;
}
+
c->write_and_free = buf;
c->wcurr = buf;
c->wbytes = bytes;
@@ -668,17 +775,23 @@ void process_stat(conn *c, char *command) {
c->write_and_go = conn_read;
return;
}
+
out_string(c, "ERROR");
}
+
void process_command(conn *c, char *command) {
+
int comm = 0;
int incr = 0;
+
/*
* for commands set/add/replace, we build an item and read the data
* directly into it, then continue in nread_complete().
*/
+
if (settings.verbose > 1)
fprintf(stderr, "<%d %s\n", c->sfd, command);
+
c->msgcurr = 0;
c->msgused = 0;
c->iovused = 0;
@@ -686,19 +799,23 @@ void process_command(conn *c, char *command) {
out_string(c, "SERVER_ERROR out of memory");
return;
}
+
if ((strncmp(command, "add ", 4) == 0 && (comm = NREAD_ADD)) ||
(strncmp(command, "set ", 4) == 0 && (comm = NREAD_SET)) ||
(strncmp(command, "replace ", 8) == 0 && (comm = NREAD_REPLACE))) {
+
char key[251];
int flags;
time_t expire;
int len, res;
item *it;
+
res = sscanf(command, "%*s %250s %u %ld %d\n", key, &flags, &expire, &len);
if (res!=4 || strlen(key)==0 ) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
+
if (settings.managed) {
int bucket = c->bucket;
if (bucket == -1) {
@@ -711,8 +828,10 @@ void process_command(conn *c, char *command) {
return;
}
}
+
expire = realtime(expire);
it = item_alloc(key, flags, expire, len+2);
+
if (it == 0) {
if (! item_size_ok(key, flags, len + 2))
out_string(c, "SERVER_ERROR object too large for cache");
@@ -723,6 +842,7 @@ void process_command(conn *c, char *command) {
c->sbytes = len+2;
return;
}
+
c->item_comm = comm;
c->item = it;
c->ritem = ITEM_data(it);
@@ -730,6 +850,7 @@ void process_command(conn *c, char *command) {
conn_set_state(c, conn_nread);
return;
}
+
if ((strncmp(command, "incr ", 5) == 0 && (incr = 1)) ||
(strncmp(command, "decr ", 5) == 0)) {
char temp[32];
@@ -789,11 +910,13 @@ void process_command(conn *c, char *command) {
out_string(c, temp);
return;
}
+
if (strncmp(command, "bget ", 5) == 0) {
c->binary = 1;
goto get;
}
if (strncmp(command, "get ", 4) == 0) {
+
char *start = command + 4;
char key[251];
int next;
@@ -803,6 +926,7 @@ void process_command(conn *c, char *command) {
get:
now = current_time;
i = 0;
+
if (settings.managed) {
int bucket = c->bucket;
if (bucket == -1) {
@@ -815,6 +939,7 @@ void process_command(conn *c, char *command) {
return;
}
}
+
while(sscanf(start, " %250s%n", key, &next) >= 1) {
start+=next;
stats.get_cmds++;
@@ -827,6 +952,7 @@ void process_command(conn *c, char *command) {
c->ilist = new_list;
} else break;
}
+
/*
* Construct the response. Each hit adds three elements to the
* outgoing data list:
@@ -843,6 +969,7 @@ void process_command(conn *c, char *command) {
}
if (settings.verbose > 1)
fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
+
stats.get_hits++;
it->refcount++;
item_update(it);
@@ -850,11 +977,14 @@ void process_command(conn *c, char *command) {
i++;
} else stats.get_misses++;
}
+
c->icurr = c->ilist;
c->ileft = i;
+
if (settings.verbose > 1)
fprintf(stderr, ">%d END\n", c->sfd);
add_iov(c, "END\r\n", 5);
+
if (c->udp && build_udp_headers(c)) {
out_string(c, "SERVER_ERROR out of memory");
}
@@ -864,11 +994,13 @@ void process_command(conn *c, char *command) {
}
return;
}
+
if (strncmp(command, "delete ", 7) == 0) {
char key[251];
item *it;
int res;
time_t exptime = 0;
+
if (settings.managed) {
int bucket = c->bucket;
if (bucket == -1) {
@@ -906,6 +1038,7 @@ void process_command(conn *c, char *command) {
return;
}
}
+
it->refcount++;
/* use its expiration time as its deletion time now */
it->exptime = realtime(exptime);
@@ -914,6 +1047,7 @@ void process_command(conn *c, char *command) {
out_string(c, "DELETED");
return;
}
+
if (strncmp(command, "own ", 4) == 0) {
int bucket, gen;
char *start = command+4;
@@ -934,6 +1068,7 @@ void process_command(conn *c, char *command) {
return;
}
}
+
if (strncmp(command, "disown ", 7) == 0) {
int bucket;
char *start = command+7;
@@ -954,6 +1089,7 @@ void process_command(conn *c, char *command) {
return;
}
}
+
if (strncmp(command, "bg ", 3) == 0) {
int bucket, gen;
char *start = command+3;
@@ -976,35 +1112,43 @@ void process_command(conn *c, char *command) {
return;
}
}
+
if (strncmp(command, "stats", 5) == 0) {
process_stat(c, command);
return;
}
+
if (strncmp(command, "flush_all", 9) == 0) {
time_t exptime = 0;
int res;
+
if (strcmp(command, "flush_all") == 0) {
settings.oldest_live = current_time;
out_string(c, "OK");
return;
}
+
res = sscanf(command, "%*s %ld", &exptime);
if (res != 1) {
out_string(c, "ERROR");
return;
}
+
settings.oldest_live = realtime(exptime);
out_string(c, "OK");
return;
}
+
if (strcmp(command, "version") == 0) {
out_string(c, "VERSION " VERSION);
return;
}
+
if (strcmp(command, "quit") == 0) {
conn_set_state(c, conn_closing);
return;
}
+
if (strncmp(command, "slabs reassign ", 15) == 0) {
#ifdef ALLOW_SLABS_REASSIGN
int src, dst;
@@ -1030,14 +1174,17 @@ void process_command(conn *c, char *command) {
#endif
return;
}
+
out_string(c, "ERROR");
return;
}
+
/*
* if we have a complete line in the buffer, process it.
*/
int try_read_command(conn *c) {
char *el, *cont;
+
if (!c->rbytes)
return 0;
el = memchr(c->rcurr, '\n', c->rbytes);
@@ -1048,38 +1195,48 @@ int try_read_command(conn *c) {
el--;
}
*el = '\0';
+
process_command(c, c->rcurr);
+
c->rbytes -= (cont - c->rcurr);
c->rcurr = cont;
+
return 1;
}
+
/*
* read a UDP request.
* return 0 if there's nothing to read.
*/
int try_read_udp(conn *c) {
int res;
+
c->request_addr_size = sizeof(c->request_addr);
res = recvfrom(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes,
0, &c->request_addr, &c->request_addr_size);
if (res > 8) {
unsigned char *buf = (unsigned char *)c->rbuf + c->rbytes;
stats.bytes_read += res;
+
/* Beginning of UDP packet is the request ID; save it. */
c->request_id = buf[0] * 256 + buf[1];
+
/* If this is a multi-packet request, drop it. */
if (buf[4] != 0 || buf[5] != 1) {
out_string(c, "SERVER_ERROR multi-packet request not supported");
return 0;
}
+
/* Don't care about any of the rest of the header. */
res -= 8;
memmove(c->rbuf, c->rbuf + 8, res);
+
c->rbytes += res;
return 1;
}
return 0;
}
+
/*
* read from network as much as we can, handle buffer overflow and connection
* close.
@@ -1090,11 +1247,13 @@ int try_read_udp(conn *c) {
int try_read_network(conn *c) {
int gotdata = 0;
int res;
+
if (c->rcurr != c->rbuf) {
if (c->rbytes != 0) /* otherwise there's nothing to copy */
memmove(c->rbuf, c->rcurr, c->rbytes);
c->rcurr = c->rbuf;
}
+
while (1) {
if (c->rbytes >= c->rsize) {
char *new_rbuf = realloc(c->rbuf, c->rsize*2);
@@ -1109,6 +1268,7 @@ int try_read_network(conn *c) {
c->rcurr = c->rbuf = new_rbuf;
c->rsize *= 2;
}
+
/* unix socket mode doesn't need this, so zeroed out. but why
* is this done for every command? presumably for UDP
* mode. */
@@ -1117,6 +1277,7 @@ int try_read_network(conn *c) {
} else {
c->request_addr_size = 0;
}
+
res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
if (res > 0) {
stats.bytes_read += res;
@@ -1136,6 +1297,7 @@ int try_read_network(conn *c) {
}
return gotdata;
}
+
int update_event(conn *c, int new_flags) {
if (c->ev_flags == new_flags)
return 1;
@@ -1145,6 +1307,7 @@ int update_event(conn *c, int new_flags) {
if (event_add(&c->event, 0) == -1) return 0;
return 1;
}
+
/*
* Transmit the next chunk of data from our list of msgbuf structures.
*
@@ -1156,6 +1319,7 @@ int update_event(conn *c, int new_flags) {
*/
int transmit(conn *c) {
int res;
+
if (c->msgcurr < c->msgused &&
c->msglist[c->msgcurr].msg_iovlen == 0) {
/* Finished writing the current msg; advance to the next. */
@@ -1166,6 +1330,7 @@ int transmit(conn *c) {
res = sendmsg(c->sfd, m, 0);
if (res > 0) {
stats.bytes_written += res;
+
/* We've written some of the data. Remove the completed
iovec entries from the list of pending writes. */
while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
@@ -1173,6 +1338,7 @@ int transmit(conn *c) {
m->msg_iovlen--;
m->msg_iov++;
}
+
/* Might have written just part of the last iovec entry;
adjust it so the next write will do the rest. */
if (res > 0) {
@@ -1194,6 +1360,7 @@ int transmit(conn *c) {
we have a real error, on which we close the connection */
if (settings.verbose > 0)
perror("Failed to write, and not due to blocking");
+
if (c->udp)
conn_set_state(c, conn_read);
else
@@ -1203,13 +1370,16 @@ int transmit(conn *c) {
return TRANSMIT_COMPLETE;
}
}
+
void drive_machine(conn *c) {
+
int exit = 0;
int sfd, flags = 1;
socklen_t addrlen;
struct sockaddr addr;
conn *newc;
int res;
+
while (!exit) {
switch(c->state) {
case conn_listening:
@@ -1237,7 +1407,9 @@ void drive_machine(conn *c) {
close(sfd);
break;
}
+
break;
+
case conn_read:
if (try_read_command(c)) {
continue;
@@ -1254,6 +1426,7 @@ void drive_machine(conn *c) {
}
exit = 1;
break;
+
case conn_nread:
/* we are reading rlbytes into ritem; */
if (c->rlbytes == 0) {
@@ -1270,6 +1443,7 @@ void drive_machine(conn *c) {
c->rbytes -= tocopy;
break;
}
+
/* now try reading from the socket */
res = read(c->sfd, c->ritem, c->rlbytes);
if (res > 0) {
@@ -1297,12 +1471,14 @@ void drive_machine(conn *c) {
fprintf(stderr, "Failed to read, and not due to blocking\n");
conn_set_state(c, conn_closing);
break;
+
case conn_swallow:
/* we are reading sbytes and throwing them away */
if (c->sbytes == 0) {
conn_set_state(c, conn_read);
break;
}
+
/* first check if we have leftovers in the conn_read buffer */
if (c->rbytes > 0) {
int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
@@ -1311,6 +1487,7 @@ void drive_machine(conn *c) {
c->rbytes -= tocopy;
break;
}
+
/* now try reading from the socket */
res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
if (res > 0) {
@@ -1337,6 +1514,7 @@ void drive_machine(conn *c) {
fprintf(stderr, "Failed to read, and not due to blocking\n");
conn_set_state(c, conn_closing);
break;
+
case conn_write:
/*
* We want to write out a simple response. If we haven't already,
@@ -1352,7 +1530,9 @@ void drive_machine(conn *c) {
break;
}
}
+
/* fall through... */
+
case conn_mwrite:
switch (transmit(c)) {
case TRANSMIT_COMPLETE:
@@ -1377,14 +1557,17 @@ void drive_machine(conn *c) {
conn_set_state(c, conn_closing);
}
break;
+
case TRANSMIT_INCOMPLETE:
case TRANSMIT_HARD_ERROR:
break; /* Continue in state machine. */
+
case TRANSMIT_SOFT_ERROR:
exit = 1;
break;
}
break;
+
case conn_closing:
if (c->udp)
conn_cleanup(c);
@@ -1393,13 +1576,18 @@ void drive_machine(conn *c) {
exit = 1;
break;
}
+
}
+
return;
}
+
void event_handler(int fd, short which, void *arg) {
conn *c;
+
c = (conn *)arg;
c->which = which;
+
/* sanity */
if (fd != c->sfd) {
if (settings.verbose > 0)
@@ -1407,18 +1595,23 @@ void event_handler(int fd, short which, void *arg) {
conn_close(c);
return;
}
+
/* do as much I/O as possible until we block */
drive_machine(c);
+
/* wait for next event */
return;
}
+
int new_socket(int is_udp) {
int sfd;
int flags;
+
if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {
perror("socket()");
return -1;
}
+
if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
perror("setting O_NONBLOCK");
@@ -1427,6 +1620,8 @@ int new_socket(int is_udp) {
}
return sfd;
}
+
+
/*
* Sets a socket's send buffer size to the maximum allowed by the system.
*/
@@ -1435,15 +1630,18 @@ void maximize_sndbuf(int sfd) {
int last_good;
int min, max, avg;
int old_size;
+
/* Start with the default size. */
if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize)) {
if (settings.verbose > 0)
perror("getsockopt(SO_SNDBUF)");
return;
}
+
/* Binary-search for the real maximum. */
min = old_size;
max = MAX_SENDBUF_SIZE;
+
while (min <= max) {
avg = ((unsigned int) min + max) / 2;
if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &avg, intsize) == 0) {
@@ -1453,17 +1651,22 @@ void maximize_sndbuf(int sfd) {
max = avg - 1;
}
}
+
if (settings.verbose > 1)
fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
}
+
+
int server_socket(int port, int is_udp) {
int sfd;
struct linger ling = {0, 0};
struct sockaddr_in addr;
int flags =1;
+
if ((sfd = new_socket(is_udp)) == -1) {
return -1;
}
+
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
if (is_udp) {
maximize_sndbuf(sfd);
@@ -1472,11 +1675,13 @@ int server_socket(int port, int is_udp) {
setsockopt(sfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
}
+
/*
* the memset call clears nonstandard fields in some impementations
* that otherwise mess things up.
*/
memset(&addr, 0, sizeof(addr));
+
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr = settings.interface;
@@ -1492,13 +1697,16 @@ int server_socket(int port, int is_udp) {
}
return sfd;
}
+
int new_socket_unix(void) {
int sfd;
int flags;
+
if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
perror("socket()");
return -1;
}
+
if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
perror("setting O_NONBLOCK");
@@ -1507,18 +1715,22 @@ int new_socket_unix(void) {
}
return sfd;
}
+
int server_socket_unix(char *path) {
int sfd;
struct linger ling = {0, 0};
struct sockaddr_un addr;
struct stat tstat;
int flags =1;
+
if (!path) {
return -1;
}
+
if ((sfd = new_socket_unix()) == -1) {
return -1;
}
+
/*
* Clean up a previous socket file if we left it around
*/
@@ -1526,14 +1738,17 @@ int server_socket_unix(char *path) {
if (S_ISSOCK(tstat.st_mode))
unlink(path);
}
+
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags));
setsockopt(sfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
+
/*
* the memset call clears nonstandard fields in some impementations
* that otherwise mess things up.
*/
memset(&addr, 0, sizeof(addr));
+
addr.sun_family = AF_UNIX;
strcpy(addr.sun_path, path);
if (bind(sfd, (struct sockaddr *) &addr, sizeof(addr)) == -1) {
@@ -1548,6 +1763,8 @@ int server_socket_unix(char *path) {
}
return sfd;
}
+
+
/* invoke right before gdb is called, on assert */
void pre_gdb () {
int i = 0;
@@ -1556,6 +1773,7 @@ void pre_gdb () {
for (i=3; i<=500; i++) close(i); /* so lame */
kill(getpid(), SIGABRT);
}
+
/*
* We keep the current time of day in a global variable that's updated by a
* timer event. This saves us a bunch of time() system calls (we really only
@@ -1566,25 +1784,32 @@ void pre_gdb () {
*/
volatile rel_time_t current_time;
struct event clockevent;
+
void clock_handler(int fd, short which, void *arg) {
struct timeval t;
static int initialized = 0;
+
if (initialized) {
/* only delete the event if it's actually there. */
evtimer_del(&clockevent);
} else {
initialized = 1;
}
+
evtimer_set(&clockevent, clock_handler, 0);
t.tv_sec = 1;
t.tv_usec = 0;
evtimer_add(&clockevent, &t);
+
current_time = (rel_time_t) (time(0) - stats.started);
}
+
struct event deleteevent;
+
void delete_handler(int fd, short which, void *arg) {
struct timeval t;
static int initialized = 0;
+
if (initialized) {
/* some versions of libevent don't like deleting events that don't exist,
so only delete once we know this event has been added. */
@@ -1592,9 +1817,11 @@ void delete_handler(int fd, short which, void *arg) {
} else {
initialized = 1;
}
+
evtimer_set(&deleteevent, delete_handler, 0);
t.tv_sec = 5; t.tv_usec=0;
evtimer_add(&deleteevent, &t);
+
{
int i, j=0;
rel_time_t now = current_time;
@@ -1612,6 +1839,7 @@ void delete_handler(int fd, short which, void *arg) {
delcurr = j;
}
}
+
void usage(void) {
printf(PACKAGE " " VERSION "\n");
printf("-p <num> port number to listen on\n");
@@ -1634,6 +1862,7 @@ void usage(void) {
printf("-n <bytes> minimum space allocated for key+value+flags, default 48\n");
return;
}
+
void usage_license(void) {
printf(PACKAGE " " VERSION "\n\n");
printf(
@@ -1701,35 +1930,45 @@ void usage_license(void) {
"(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
"THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
);
+
return;
}
+
void save_pid(pid_t pid,char *pid_file) {
FILE *fp;
if (!pid_file)
return;
+
if (!(fp = fopen(pid_file,"w"))) {
fprintf(stderr,"Could not open the pid file %s for writing\n",pid_file);
return;
}
+
fprintf(fp,"%ld\n",(long) pid);
if (fclose(fp) == -1) {
fprintf(stderr,"Could not close the pid file %s.\n",pid_file);
return;
}
}
+
void remove_pidfile(char *pid_file) {
if (!pid_file)
return;
+
if (unlink(pid_file)) {
fprintf(stderr,"Could not remove the pid file %s.\n",pid_file);
}
+
}
+
int l_socket=0;
int u_socket=-1;
+
void sig_handler(int sig) {
printf("SIGINT handled.\n");
exit(0);
}
+
int main (int argc, char **argv) {
int c;
conn *l_conn;
@@ -1743,12 +1982,16 @@ int main (int argc, char **argv) {
struct sigaction sa;
struct rlimit rlim;
char *pid_file = NULL;
+
/* handle SIGINT */
signal(SIGINT, sig_handler);
+
/* init settings */
settings_init();
+
/* set stderr non-buffering (for running under, say, daemontools) */
setbuf(stderr, NULL);
+
/* process arguments */
while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:")) != -1) {
switch (c) {
@@ -1824,6 +2067,7 @@ int main (int argc, char **argv) {
return 1;
}
}
+
if (maxcore) {
struct rlimit rlim_new;
/*
@@ -1844,15 +2088,18 @@ int main (int argc, char **argv) {
* the soft limit ends up 0, because then no core files will be
* created at all.
*/
+
if ((getrlimit(RLIMIT_CORE, &rlim)!=0) || rlim.rlim_cur==0) {
fprintf(stderr, "failed to ensure corefile creation\n");
exit(1);
}
}
+
/*
* If needed, increase rlimits to allow as many connections
* as needed.
*/
+
if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
fprintf(stderr, "failed to getrlimit number of files\n");
exit(1);
@@ -1867,12 +2114,14 @@ int main (int argc, char **argv) {
exit(1);
}
}
+
/*
* initialization order: first create the listening sockets
* (may need root on low ports), then drop root if needed,
* then daemonise if needed, then init libevent (in some cases
* descriptors created by libevent wouldn't survive forking).
*/
+
/* create the listening socket and bind it */
if (!settings.socketpath) {
l_socket = server_socket(settings.port, 0);
@@ -1881,6 +2130,7 @@ int main (int argc, char **argv) {
exit(1);
}
}
+
if (settings.udpport > 0 && ! settings.socketpath) {
/* create the UDP listening socket and bind it */
u_socket = server_socket(settings.udpport, 1);
@@ -1889,6 +2139,7 @@ int main (int argc, char **argv) {
exit(1);
}
}
+
/* lose root privileges if we have them */
if (getuid()== 0 || geteuid()==0) {
if (username==0 || *username=='\0') {
@@ -1904,6 +2155,7 @@ int main (int argc, char **argv) {
return 1;
}
}
+
/* create unix mode sockets after dropping privileges */
if (settings.socketpath) {
l_socket = server_socket_unix(settings.socketpath);
@@ -1912,6 +2164,7 @@ int main (int argc, char **argv) {
exit(1);
}
}
+
/* daemonize if requested */
/* if we want to ensure our ability to dump core, don't chdir to / */
if (daemonize) {
@@ -1922,6 +2175,8 @@ int main (int argc, char **argv) {
return 1;
}
}
+
+
/* initialize other stuff */
item_init();
event_init();
@@ -1929,6 +2184,7 @@ int main (int argc, char **argv) {
assoc_init();
conn_init();
slabs_init(settings.maxbytes, settings.factor);
+
/* managed instance? alloc and zero a bucket array */
if (settings.managed) {
buckets = malloc(sizeof(int)*MAX_BUCKETS);
@@ -1938,6 +2194,7 @@ int main (int argc, char **argv) {
}
memset(buckets, 0, sizeof(int)*MAX_BUCKETS);
}
+
/* lock paged memory if needed */
if (lock_memory) {
#ifdef HAVE_MLOCKALL
@@ -1946,6 +2203,7 @@ int main (int argc, char **argv) {
fprintf(stderr, "warning: mlockall() not supported on this platform. proceeding without.\n");
#endif
}
+
/*
* ignore SIGPIPE signals; we can use errno==EPIPE if we
* need that information
diff --git a/memcached.h b/memcached.h
index 2f57a72..621f0ec 100644
--- a/memcached.h
+++ b/memcached.h
@@ -5,19 +5,25 @@
#define UDP_MAX_PAYLOAD_SIZE 1400
#define UDP_HEADER_SIZE 8
#define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
+
/* Initial size of list of items being returned by "get". */
#define ITEM_LIST_INITIAL 200
+
/* Initial size of the sendmsg() scatter/gather array. */
#define IOV_LIST_INITIAL 400
+
/* Initial number of sendmsg() argument structures to allocate. */
#define MSG_LIST_INITIAL 10
+
/* High water marks for buffer shrinking */
#define READ_BUFFER_HIGHWAT 8192
#define ITEM_LIST_HIGHWAT 400
#define IOV_LIST_HIGHWAT 600
#define MSG_LIST_HIGHWAT 100
+
/* Time relative to server start. Smaller than time_t on 64-bit systems. */
typedef unsigned int rel_time_t;
+
struct stats {
unsigned int curr_items;
unsigned int total_items;
@@ -33,6 +39,7 @@ struct stats {
unsigned long long bytes_read;
unsigned long long bytes_written;
};
+
struct settings {
size_t maxbytes;
int maxconns;
@@ -47,12 +54,16 @@ struct settings {
double factor; /* chunk size growth factor */
int chunk_size;
};
+
extern struct stats stats;
extern struct settings settings;
+
#define ITEM_LINKED 1
#define ITEM_DELETED 2
+
/* temp */
#define ITEM_SLABBED 4
+
typedef struct _stritem {
struct _stritem *next;
struct _stritem *prev;
@@ -70,11 +81,14 @@ typedef struct _stritem {
/* then " flags length\r\n" (no terminating null) */
/* then data with terminating \r\n (no terminating null; it's binary!) */
} item;
+
#define ITEM_key(item) ((char*)&((item)->end[0]))
+
/* warning: don't use these macros with a function, as it evals its arg twice */
#define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey)
#define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + (item)->nsuffix)
#define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + (item)->nsuffix + (item)->nbytes)
+
enum conn_states {
conn_listening, /* the socket which listens for connections */
conn_read, /* reading in a command line */
@@ -84,50 +98,63 @@ enum conn_states {
conn_closing, /* closing this connection */
conn_mwrite /* writing out many items sequentially */
};
+
#define NREAD_ADD 1
#define NREAD_SET 2
#define NREAD_REPLACE 3
+
typedef struct {
int sfd;
int state;
struct event event;
short ev_flags;
short which; /* which events were just triggered */
+
char *rbuf; /* buffer to read commands into */
char *rcurr; /* but if we parsed some already, this is where we stopped */
int rsize; /* total allocated size of rbuf */
int rbytes; /* how much data, starting from rcur, do we have unparsed */
+
char *wbuf;
char *wcurr;
int wsize;
int wbytes;
int write_and_go; /* which state to go into after finishing current write */
void *write_and_free; /* free this memory after finishing writing */
+
char *ritem; /* when we read in an item's value, it goes here */
int rlbytes;
+
/* data for the nread state */
+
/*
* item is used to hold an item structure created after reading the command
* line of set/add/replace commands, but before we finished reading the actual
* data. The data is read into ITEM_data(item) to avoid extra copying.
*/
+
void *item; /* for commands set/add/replace */
int item_comm; /* which one is it: set/add/replace */
+
/* data for the swallow state */
int sbytes; /* how many bytes to swallow */
+
/* data for the mwrite state */
struct iovec *iov;
int iovsize; /* number of elements allocated in iov[] */
int iovused; /* number of elements used in iov[] */
+
struct msghdr *msglist;
int msgsize; /* number of elements allocated in msglist[] */
int msgused; /* number of elements used in msglist[] */
int msgcurr; /* element in msglist[] being transmitted now */
int msgbytes; /* number of bytes in current msg */
+
item **ilist; /* list of items to write out */
int isize;
item **icurr;
int ileft;
+
/* data for UDP clients */
int udp; /* 1 if this is a UDP "connection" */
int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
@@ -135,36 +162,48 @@ typedef struct {
socklen_t request_addr_size;
unsigned char *hdrbuf; /* udp packet headers */
int hdrsize; /* number of headers' worth of space is allocated */
+
int binary; /* are we in binary mode */
int bucket; /* bucket number for the next command, if running as
a managed instance. -1 (_not_ 0) means invalid. */
int gen; /* generation requested for the bucket */
} conn;
+
/* number of virtual buckets for a managed instance */
#define MAX_BUCKETS 32768
+
/* listening socket */
extern int l_socket;
+
/* udp socket */
extern int u_socket;
+
/* current time of day (updated periodically) */
extern volatile rel_time_t current_time;
+
/* temporary hack */
/* #define assert(x) if(!(x)) { printf("assert failure: %s\n", #x); pre_gdb(); }
void pre_gdb (); */
+
/*
* Functions
*/
+
/*
* given time value that's either unix time or delta from current unix time, return
* unix time. Use the fact that delta can't exceed one month (and real time value can't
* be that low).
*/
+
rel_time_t realtime(time_t exptime);
+
/* slabs memory allocation */
+
/* Init the subsystem. 1st argument is the limit on no. of bytes to allocate,
0 if no limit. 2nd argument is the growth factor; each slab will use a chunk
size equal to the previous slab's chunk size times this factor. */
void slabs_init(size_t limit, double factor);
+
/* Preallocate as many slab pages as possible (called from slabs_init)
on start-up, so users don't get confused out-of-memory errors when
they do have free (in-slab) space, but no space to make new slabs.
@@ -172,20 +211,26 @@ void slabs_init(size_t limit, double factor);
slab types can be made. if max memory is less than 18 MB, only the
smaller ones will be made. */
void slabs_preallocate (unsigned int maxslabs);
+
/* Given object size, return id to use when allocating/freeing memory for object */
/* 0 means error: can't store such a large object */
unsigned int slabs_clsid(size_t size);
+
/* Allocate object of given length. 0 on error */
void *slabs_alloc(size_t size);
+
/* Free previously allocated object */
void slabs_free(void *ptr, size_t size);
+
/* Fill buffer with stats */
char* slabs_stats(int *buflen);
+
/* Request some slab be moved between classes
1 = success
0 = fail
-1 = tried. busy. send again shortly. */
int slabs_reassign(unsigned char srcid, unsigned char dstid);
+
/* event handling, network IO */
void event_handler(int fd, short which, void *arg);
conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp);
diff --git a/slabs.c b/slabs.c
index 9acc2ba..21e552d 100644
--- a/slabs.c
+++ b/slabs.c
@@ -24,35 +24,47 @@
#include <errno.h>
#include <event.h>
#include <assert.h>
+
#include "memcached.h"
+
#define POWER_SMALLEST 1
#define POWER_LARGEST 200
#define POWER_BLOCK 1048576
#define CHUNK_ALIGN_BYTES (sizeof(void *))
+
/* powers-of-N allocation structures */
+
typedef struct {
unsigned int size; /* sizes of items */
unsigned int perslab; /* how many items per slab */
+
void **slots; /* list of item ptrs */
unsigned int sl_total; /* size of previous array */
unsigned int sl_curr; /* first free slot */
+
void *end_page_ptr; /* pointer to next free item at end of page, or 0 */
unsigned int end_page_free; /* number of items remaining at end of last alloced page */
+
unsigned int slabs; /* how many slabs were allocated for this class */
+
void **slab_list; /* array of slab pointers */
unsigned int list_size; /* size of prev array */
+
unsigned int killing; /* index+1 of dying slab, or zero if none */
} slabclass_t;
+
static slabclass_t slabclass[POWER_LARGEST+1];
static size_t mem_limit = 0;
static size_t mem_malloced = 0;
static int power_largest;
+
/*
* Figures out which slab class (chunk size) is required to store an item of
* a given size.
*/
unsigned int slabs_clsid(size_t size) {
int res = POWER_SMALLEST;
+
if(size==0)
return 0;
while (size > slabclass[res].size)
@@ -60,6 +72,7 @@ unsigned int slabs_clsid(size_t size) {
return 0;
return res;
}
+
/*
* Determines the chunk sizes and initializes the slab class descriptors
* accordingly.
@@ -67,15 +80,19 @@ unsigned int slabs_clsid(size_t size) {
void slabs_init(size_t limit, double factor) {
int i = POWER_SMALLEST - 1;
unsigned int size = sizeof(item) + settings.chunk_size;
+
/* Factor of 2.0 means use the default memcached behavior */
if (factor == 2.0 && size < 128)
size = 128;
+
mem_limit = limit;
memset(slabclass, 0, sizeof(slabclass));
+
while (++i < POWER_LARGEST && size <= POWER_BLOCK / 2) {
/* Make sure items are always n-byte aligned */
if (size % CHUNK_ALIGN_BYTES)
size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);
+
slabclass[i].size = size;
slabclass[i].perslab = POWER_BLOCK / slabclass[i].size;
size *= factor;
@@ -84,27 +101,34 @@ void slabs_init(size_t limit, double factor) {
i, slabclass[i].size, slabclass[i].perslab);
}
}
+
power_largest = i;
slabclass[power_largest].size = POWER_BLOCK;
slabclass[power_largest].perslab = 1;
+
#ifndef DONT_PREALLOC_SLABS
slabs_preallocate(limit / POWER_BLOCK);
#endif
}
+
void slabs_preallocate (unsigned int maxslabs) {
int i;
unsigned int prealloc = 0;
+
/* pre-allocate a 1MB slab in every size class so people don't get
confused by non-intuitive "SERVER_ERROR out of memory"
messages. this is the most common question on the mailing
list. if you really don't want this, you can rebuild without
these three lines. */
+
for(i=POWER_SMALLEST; i<=POWER_LARGEST; i++) {
if (++prealloc > maxslabs)
return;
slabs_newslab(i);
}
+
}
+
static int grow_slab_list (unsigned int id) {
slabclass_t *p = &slabclass[id];
if (p->slabs == p->list_size) {
@@ -116,6 +140,7 @@ static int grow_slab_list (unsigned int id) {
}
return 1;
}
+
int slabs_newslab(unsigned int id) {
slabclass_t *p = &slabclass[id];
#ifdef ALLOW_SLABS_REASSIGN
@@ -124,38 +149,50 @@ int slabs_newslab(unsigned int id) {
int len = p->size * p->perslab;
#endif
char *ptr;
+
if (mem_limit && mem_malloced + len > mem_limit && p->slabs > 0)
return 0;
+
if (! grow_slab_list(id)) return 0;
+
ptr = malloc(len);
if (ptr == 0) return 0;
+
memset(ptr, 0, len);
p->end_page_ptr = ptr;
p->end_page_free = p->perslab;
+
p->slab_list[p->slabs++] = ptr;
mem_malloced += len;
return 1;
}
+
void *slabs_alloc(size_t size) {
slabclass_t *p;
+
unsigned char id = slabs_clsid(size);
if (id < POWER_SMALLEST || id > power_largest)
return 0;
+
p = &slabclass[id];
assert(p->sl_curr == 0 || ((item*)p->slots[p->sl_curr-1])->slabs_clsid == 0);
+
#ifdef USE_SYSTEM_MALLOC
if (mem_limit && mem_malloced + size > mem_limit)
return 0;
mem_malloced += size;
return malloc(size);
#endif
+
/* fail unless we have space at the end of a recently allocated page,
we have something on our freelist, or we could allocate a new page */
if (! (p->end_page_ptr || p->sl_curr || slabs_newslab(id)))
return 0;
+
/* return off our freelist, if we have one */
if (p->sl_curr)
return p->slots[--p->sl_curr];
+
/* if we recently allocated a whole page, return from that */
if (p->end_page_ptr) {
void *ptr = p->end_page_ptr;
@@ -166,21 +203,27 @@ void *slabs_alloc(size_t size) {
}
return ptr;
}
+
return 0; /* shouldn't ever get here */
}
+
void slabs_free(void *ptr, size_t size) {
unsigned char id = slabs_clsid(size);
slabclass_t *p;
+
assert(((item *)ptr)->slabs_clsid==0);
assert(id >= POWER_SMALLEST && id <= power_largest);
if (id < POWER_SMALLEST || id > power_largest)
return;
+
p = &slabclass[id];
+
#ifdef USE_SYSTEM_MALLOC
mem_malloced -= size;
free(ptr);
return;
#endif
+
if (p->sl_curr == p->sl_total) { /* need more space on the free list */
int new_size = p->sl_total ? p->sl_total*2 : 16; /* 16 is arbitrary */
void **new_slots = realloc(p->slots, new_size*sizeof(void *));
@@ -192,19 +235,24 @@ void slabs_free(void *ptr, size_t size) {
p->slots[p->sl_curr++] = ptr;
return;
}
+
char* slabs_stats(int *buflen) {
int i, total;
char *buf = (char*) malloc(power_largest * 200 + 100);
char *bufcurr = buf;
+
*buflen = 0;
if (!buf) return 0;
+
total = 0;
for(i = POWER_SMALLEST; i <= power_largest; i++) {
slabclass_t *p = &slabclass[i];
if (p->slabs) {
unsigned int perslab, slabs;
+
slabs = p->slabs;
perslab = p->perslab;
+
bufcurr += sprintf(bufcurr, "STAT %d:chunk_size %u\r\n", i, p->size);
bufcurr += sprintf(bufcurr, "STAT %d:chunks_per_page %u\r\n", i, perslab);
bufcurr += sprintf(bufcurr, "STAT %d:total_pages %u\r\n", i, slabs);
@@ -220,6 +268,7 @@ char* slabs_stats(int *buflen) {
*buflen = bufcurr - buf;
return buf;
}
+
#ifdef ALLOW_SLABS_REASSIGN
/* Blows away all the items in a slab class and moves its slabs to another
class. This is only used by the "slabs reassign" command, for manual tweaking
@@ -234,20 +283,27 @@ int slabs_reassign(unsigned char srcid, unsigned char dstid) {
slabclass_t *p, *dp;
void *iter;
int was_busy = 0;
+
if (srcid < POWER_SMALLEST || srcid > power_largest ||
dstid < POWER_SMALLEST || dstid > power_largest)
return 0;
+
p = &slabclass[srcid];
dp = &slabclass[dstid];
+
/* fail if src still populating, or no slab to give up in src */
if (p->end_page_ptr || ! p->slabs)
return 0;
+
/* fail if dst is still growing or we can't make room to hold its new one */
if (dp->end_page_ptr || ! grow_slab_list(dstid))
return 0;
+
if (p->killing == 0) p->killing = 1;
+
slab = p->slab_list[p->killing-1];
slab_end = slab + POWER_BLOCK;
+
for (iter=slab; iter<slab_end; iter+=p->size) {
item *it = (item *) iter;
if (it->slabs_clsid) {
@@ -255,6 +311,7 @@ int slabs_reassign(unsigned char srcid, unsigned char dstid) {
item_unlink(it);
}
}
+
/* go through free list and discard items that are no longer part of this slab */
{
int fi;
@@ -265,7 +322,9 @@ int slabs_reassign(unsigned char srcid, unsigned char dstid) {
}
}
}
+
if (was_busy) return -1;
+
/* if good, now move it to the dst slab class */
p->slab_list[p->killing-1] = p->slab_list[p->slabs-1];
p->slabs--;