summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2016-07-01 14:41:29 -0700
committerdormando <dormando@rydia.net>2016-07-12 18:42:46 -0700
commitb05653f9a8ab3ee70431ef83a136e15f22e617ea (patch)
treededd967f3f5ef7e69b1c4943ead0b64722a419ec
parent0567967a925faf4e0a6bb7a181ab5aa1eff2272e (diff)
downloadmemcached-b05653f9a8ab3ee70431ef83a136e15f22e617ea.tar.gz
chunked item second checkpoint
can actually fetch items now, and fixed a few bugs with storage/freeing. added fetching for binprot. added some basic tests. many tests still fail for various reasons, and append/prepend isn't fixed yet.
-rw-r--r--memcached.c80
-rw-r--r--memcached.h1
-rw-r--r--slabs.c11
-rw-r--r--t/chunked-items.t54
4 files changed, 128 insertions, 18 deletions
diff --git a/memcached.c b/memcached.c
index 2f1d6a8..ff76e57 100644
--- a/memcached.c
+++ b/memcached.c
@@ -100,6 +100,7 @@ static void process_command(conn *c, char *command);
static void write_and_free(conn *c, char *buf, int bytes);
static int ensure_iov_space(conn *c);
static int add_iov(conn *c, const void *buf, int len);
+static int add_chunked_item_iovs(conn *c, item *it, int len);
static int add_msghdr(conn *c);
static void write_bin_error(conn *c, protocol_binary_response_status err,
const char *errstr, int swallow);
@@ -880,6 +881,20 @@ static int add_iov(conn *c, const void *buf, int len) {
return 0;
}
+static int add_chunked_item_iovs(conn *c, item *it, int len) {
+ assert(it->it_flags & ITEM_CHUNKED);
+ item_chunk *ch = (item_chunk *) ITEM_data(it);
+ while (ch) {
+ int todo = (len > ch->used) ? ch->used : len;
+ //fprintf(stderr, "ADDING AN IOV CHUNK FOR RESPONSE\n");
+ if (add_iov(c, ch->data, todo) != 0) {
+ return -1;
+ }
+ ch = ch->next;
+ len -= todo;
+ }
+ return 0;
+}
/*
* Constructs a set of UDP headers and attaches them to the outgoing messages.
@@ -1002,7 +1017,7 @@ static void complete_nread_ascii(conn *c) {
pthread_mutex_unlock(&c->thread->stats.mutex);
if ((it->it_flags & ITEM_CHUNKED) == 0) {
- if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
+ if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) == 0) {
is_valid = true;
}
} else {
@@ -1344,8 +1359,22 @@ static void complete_update_bin(conn *c) {
/* We don't actually receive the trailing two characters in the bin
* protocol, so we're going to just set them here */
- *(ITEM_data(it) + it->nbytes - 2) = '\r';
- *(ITEM_data(it) + it->nbytes - 1) = '\n';
+ if ((it->it_flags & ITEM_CHUNKED) == 0) {
+ *(ITEM_data(it) + it->nbytes - 2) = '\r';
+ *(ITEM_data(it) + it->nbytes - 1) = '\n';
+ } else {
+ assert(c->ritem);
+ item_chunk *ch = (item_chunk *) c->ritem;
+ fprintf(stderr, "BINPROT: WRITING DELIMITER INTO CHUNK TAIL\n");
+ if (ch->used > 1) {
+ ch->data[ch->used - 2] = '\r';
+ ch->data[ch->used - 1] = '\n';
+ } else {
+ assert(ch->used == 1);
+ ch->prev->data[ch->prev->used - 1] = '\r';
+ ch->data[ch->used - 1] = '\n';
+ }
+ }
ret = store_item(it, c->cmd, c);
@@ -1475,7 +1504,11 @@ static void process_bin_get_or_touch(conn *c) {
if (should_return_value) {
/* Add the data minus the CRLF */
- add_iov(c, ITEM_data(it), it->nbytes - 2);
+ if ((it->it_flags & ITEM_CHUNKED) == 0) {
+ add_iov(c, ITEM_data(it), it->nbytes - 2);
+ } else {
+ add_chunked_item_iovs(c, it, it->nbytes - 2);
+ }
}
conn_set_state(c, conn_mwrite);
@@ -3126,24 +3159,40 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens,
if (add_iov(c, "VALUE ", 6) != 0 ||
add_iov(c, ITEM_key(it), it->nkey) != 0 ||
add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
- add_iov(c, suffix, suffix_len) != 0 ||
- add_iov(c, ITEM_data(it), it->nbytes) != 0)
+ add_iov(c, suffix, suffix_len) != 0)
{
item_remove(it);
break;
}
+ if ((it->it_flags & ITEM_CHUNKED) == 0) {
+ add_iov(c, ITEM_data(it), it->nbytes);
+ } else if (add_chunked_item_iovs(c, it, it->nbytes) != 0) {
+ item_remove(it);
+ break;
+ }
}
else
{
MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
it->nbytes, ITEM_get_cas(it));
if (add_iov(c, "VALUE ", 6) != 0 ||
- add_iov(c, ITEM_key(it), it->nkey) != 0 ||
- add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
+ add_iov(c, ITEM_key(it), it->nkey) != 0)
{
item_remove(it);
break;
}
+ if ((it->it_flags & ITEM_CHUNKED) == 0)
+ {
+ if (add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
+ {
+ item_remove(it);
+ break;
+ }
+ } else if (add_iov(c, ITEM_suffix(it), it->nsuffix) != 0 ||
+ add_chunked_item_iovs(c, it, it->nbytes) != 0) {
+ item_remove(it);
+ break;
+ }
}
@@ -4286,6 +4335,7 @@ static enum transmit_result transmit(conn *c) {
}
/* Does a looped read to fill data chunks */
+/* FIXME: restrict number of times this can loop? */
static int read_into_chunked_item(conn *c) {
int total = 0;
int res;
@@ -4299,7 +4349,7 @@ static int read_into_chunked_item(conn *c) {
total = 0;
int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
tocopy = tocopy > unused ? unused : tocopy;
- fprintf(stderr, "READING [%d] FROM c->rbytes TOCOPY: [%d] UNUSED: [%d]\n", c->rbytes, tocopy, unused);
+ //fprintf(stderr, "COPYING [%d] FROM c->rbytes TOCOPY: [%d] UNUSED: [%d]\n", c->rbytes, tocopy, unused);
if (c->ritem != c->rcurr) {
memmove(ch->data + ch->used, c->rcurr, tocopy);
}
@@ -4315,7 +4365,7 @@ static int read_into_chunked_item(conn *c) {
/* now try reading from the socket */
res = read(c->sfd, ch->data + ch->used,
(unused > c->rlbytes ? c->rlbytes : unused));
- fprintf(stderr, "READ [%d] DATA INTO A CHUNK\n", res);
+ //fprintf(stderr, "READ [%d] DATA INTO A CHUNK\n", res);
if (res > 0) {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.bytes_read += res;
@@ -4323,7 +4373,7 @@ static int read_into_chunked_item(conn *c) {
ch->used += res;
total += res;
c->rlbytes -= res;
- break;
+ //break;
} else {
/* Reset total to the latest result so caller can handle it */
total = res;
@@ -4333,7 +4383,7 @@ static int read_into_chunked_item(conn *c) {
assert(ch->used <= ch->size);
if (ch->size == ch->used) {
- fprintf(stderr, "ADVANCING TO NEXT CHUNK\n");
+ //fprintf(stderr, "ADVANCING TO NEXT CHUNK\n");
if (ch->next) {
c->ritem = (char *) ch->next;
} else {
@@ -4502,9 +4552,8 @@ static void drive_machine(conn *c) {
conn_set_state(c, conn_closing);
break;
}
- /* FIXME: think the binprot gets here without an item set. */
- assert(c->item);
- if (( ((item *)c->item)->it_flags & ITEM_CHUNKED) == 0) {
+
+ if (!c->item || (((item *)c->item)->it_flags & ITEM_CHUNKED) == 0) {
/* first check if we have leftovers in the conn_read buffer */
if (c->rbytes > 0) {
int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
@@ -4535,6 +4584,7 @@ static void drive_machine(conn *c) {
}
} else {
res = read_into_chunked_item(c);
+ //fprintf(stderr, "GOT %d RES FROM READ_INTO_CHUNKED_ITEM\n", res);
if (res > 0)
break;
}
diff --git a/memcached.h b/memcached.h
index 1f44632..cb5705f 100644
--- a/memcached.h
+++ b/memcached.h
@@ -444,6 +444,7 @@ typedef struct _strchunk {
unsigned short refcount; /* used? */
uint8_t nsuffix; /* unused */
uint8_t it_flags; /* ITEM_* above. */
+ uint8_t slabs_clsid; /* Same as above. */
char data[];
} item_chunk;
diff --git a/slabs.c b/slabs.c
index 1cbc428..4b43395 100644
--- a/slabs.c
+++ b/slabs.c
@@ -367,10 +367,10 @@ static void do_slabs_free_chunked(item *it, const size_t size, unsigned int id,
chunk = chunk->next;
}
chunk = (item_chunk *) ITEM_data(it);
- fprintf(stderr, "FREEING CHUNKED ITEM INTO SLABS: SIZE: [%lu] REALSIZE: [%lu]\n", size, realsize);
unsigned int chunks_req = realsize / p->size;
if (realsize % p->size != 0)
chunks_req++;
+ fprintf(stderr, "FREEING CHUNKED ITEM INTO SLABS: SIZE: [%lu] REALSIZE: [%lu] CHUNKS_REQ: [%d]\n", size, realsize, chunks_req);
it->it_flags = ITEM_SLABBED;
it->slabs_clsid = 0;
@@ -384,10 +384,15 @@ static void do_slabs_free_chunked(item *it, const size_t size, unsigned int id,
for (x = 0; x < chunks_req-1; x++) {
chunk->it_flags = ITEM_SLABBED;
- chunk = chunk->next;
+ chunk->slabs_clsid = 0;
+ if (chunk->next)
+ chunk = chunk->next;
}
/* must have had nothing hanging off of the final chunk */
- assert(chunk == 0);
+ assert(chunk && chunk->next == 0);
+ /* Tail chunk, link the freelist here. */
+ chunk->next = p->slots;
+ if (chunk->next) chunk->next->prev = chunk;
p->slots = it;
p->sl_curr += chunks_req;
diff --git a/t/chunked-items.t b/t/chunked-items.t
new file mode 100644
index 0000000..b375741
--- /dev/null
+++ b/t/chunked-items.t
@@ -0,0 +1,54 @@
+#!/usr/bin/perl
+# Networked logging tests.
+
+use strict;
+use warnings;
+
+use Test::More;
+use FindBin qw($Bin);
+use lib "$Bin/lib";
+use MemcachedTest;
+
+my $server = new_memcached('-m 48');
+my $sock = $server->sock;
+
+# We're testing to ensure item chaining doesn't corrupt or poorly overlap
+# data, so create a non-repeating pattern.
+my @parts = ();
+for (1 .. 4000) {
+ push(@parts, $_);
+}
+my $pattern = join(':', @parts);
+
+my $plen = length($pattern);
+print STDERR "PATTERN LENGTH: $plen\n";
+
+print $sock "set pattern 0 0 $plen\r\n$pattern\r\n";
+is(scalar <$sock>, "STORED\r\n", "stored pattern successfully");
+
+mem_get_is($sock, "pattern", $pattern);
+
+for (1..5) {
+ my $size = 400 * 1024;
+ my $data = "x" x $size;
+ print $sock "set foo$_ 0 0 $size\r\n$data\r\n";
+ my $res = <$sock>;
+ is($res, "STORED\r\n", "stored some big items");
+}
+
+{
+ my $max = 1024 * 1024;
+ my $big = "a big value that's > .5M and < 1M. ";
+ while (length($big) * 2 < $max) {
+ $big = $big . $big;
+ }
+ my $biglen = length($big);
+
+ for (1..100) {
+ print $sock "set toast$_ 0 0 $biglen\r\n$big\r\n";
+ is(scalar <$sock>, "STORED\r\n", "stored big");
+ mem_get_is($sock, "toast$_", $big);
+ }
+}
+
+done_testing();