diff options
author | dormando <dormando@rydia.net> | 2016-07-01 14:41:29 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2016-07-12 18:42:46 -0700 |
commit | b05653f9a8ab3ee70431ef83a136e15f22e617ea (patch) | |
tree | dedd967f3f5ef7e69b1c4943ead0b64722a419ec | |
parent | 0567967a925faf4e0a6bb7a181ab5aa1eff2272e (diff) | |
download | memcached-b05653f9a8ab3ee70431ef83a136e15f22e617ea.tar.gz |
chunked item second checkpoint
can actually fetch items now, and fixed a few bugs with storage/freeing.
added fetching for binprot.
added some basic tests.
many tests still fail for various reasons, and append/prepend isn't fixed yet.
-rw-r--r-- | memcached.c | 80 | ||||
-rw-r--r-- | memcached.h | 1 | ||||
-rw-r--r-- | slabs.c | 11 | ||||
-rw-r--r-- | t/chunked-items.t | 54 |
4 files changed, 128 insertions, 18 deletions
diff --git a/memcached.c b/memcached.c index 2f1d6a8..ff76e57 100644 --- a/memcached.c +++ b/memcached.c @@ -100,6 +100,7 @@ static void process_command(conn *c, char *command); static void write_and_free(conn *c, char *buf, int bytes); static int ensure_iov_space(conn *c); static int add_iov(conn *c, const void *buf, int len); +static int add_chunked_item_iovs(conn *c, item *it, int len); static int add_msghdr(conn *c); static void write_bin_error(conn *c, protocol_binary_response_status err, const char *errstr, int swallow); @@ -880,6 +881,20 @@ static int add_iov(conn *c, const void *buf, int len) { return 0; } +static int add_chunked_item_iovs(conn *c, item *it, int len) { + assert(it->it_flags & ITEM_CHUNKED); + item_chunk *ch = (item_chunk *) ITEM_data(it); + while (ch) { + int todo = (len > ch->used) ? ch->used : len; + //fprintf(stderr, "ADDING AN IOV CHUNK FOR RESPONSE\n"); + if (add_iov(c, ch->data, todo) != 0) { + return -1; + } + ch = ch->next; + len -= todo; + } + return 0; +} /* * Constructs a set of UDP headers and attaches them to the outgoing messages. @@ -1002,7 +1017,7 @@ static void complete_nread_ascii(conn *c) { pthread_mutex_unlock(&c->thread->stats.mutex); if ((it->it_flags & ITEM_CHUNKED) == 0) { - if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) { + if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) == 0) { is_valid = true; } } else { @@ -1344,8 +1359,22 @@ static void complete_update_bin(conn *c) { /* We don't actually receive the trailing two characters in the bin * protocol, so we're going to just set them here */ - *(ITEM_data(it) + it->nbytes - 2) = '\r'; - *(ITEM_data(it) + it->nbytes - 1) = '\n'; + if ((it->it_flags & ITEM_CHUNKED) == 0) { + *(ITEM_data(it) + it->nbytes - 2) = '\r'; + *(ITEM_data(it) + it->nbytes - 1) = '\n'; + } else { + assert(c->ritem); + item_chunk *ch = (item_chunk *) c->ritem; + fprintf(stderr, "BINPROT: WRITING DELIMITER INTO CHUNK TAIL\n"); + if (ch->used > 1) { + ch->data[ch->used - 2] = '\r'; + ch->data[ch->used - 1] = '\n'; + } else { + assert(ch->used == 1); + ch->prev->data[ch->prev->used - 1] = '\r'; + ch->data[ch->used - 1] = '\n'; + } + } ret = store_item(it, c->cmd, c); @@ -1475,7 +1504,11 @@ static void process_bin_get_or_touch(conn *c) { if (should_return_value) { /* Add the data minus the CRLF */ - add_iov(c, ITEM_data(it), it->nbytes - 2); + if ((it->it_flags & ITEM_CHUNKED) == 0) { + add_iov(c, ITEM_data(it), it->nbytes - 2); + } else { + add_chunked_item_iovs(c, it, it->nbytes - 2); + } } conn_set_state(c, conn_mwrite); @@ -3126,24 +3159,40 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, if (add_iov(c, "VALUE ", 6) != 0 || add_iov(c, ITEM_key(it), it->nkey) != 0 || add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 || - add_iov(c, suffix, suffix_len) != 0 || - add_iov(c, ITEM_data(it), it->nbytes) != 0) + add_iov(c, suffix, suffix_len) != 0) { item_remove(it); break; } + if ((it->it_flags & ITEM_CHUNKED) == 0) { + add_iov(c, ITEM_data(it), it->nbytes); + } else if (add_chunked_item_iovs(c, it, it->nbytes) != 0) { + item_remove(it); + break; + } } else { MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey, it->nbytes, ITEM_get_cas(it)); if (add_iov(c, "VALUE ", 6) != 0 || - add_iov(c, ITEM_key(it), it->nkey) != 0 || - add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0) + add_iov(c, ITEM_key(it), it->nkey) != 0) { item_remove(it); break; } + if ((it->it_flags & ITEM_CHUNKED) == 0) + { + if (add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0) + { + item_remove(it); + break; + } + } else if (add_iov(c, ITEM_suffix(it), it->nsuffix) != 0 || + add_chunked_item_iovs(c, it, it->nbytes) != 0) { + item_remove(it); + break; + } } @@ -4286,6 +4335,7 @@ static enum transmit_result transmit(conn *c) { } /* Does a looped read to fill data chunks */ +/* FIXME: restrict number of times this can loop? */ static int read_into_chunked_item(conn *c) { int total = 0; int res; @@ -4299,7 +4349,7 @@ static int read_into_chunked_item(conn *c) { total = 0; int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes; tocopy = tocopy > unused ? unused : tocopy; - fprintf(stderr, "READING [%d] FROM c->rbytes TOCOPY: [%d] UNUSED: [%d]\n", c->rbytes, tocopy, unused); + //fprintf(stderr, "COPYING [%d] FROM c->rbytes TOCOPY: [%d] UNUSED: [%d]\n", c->rbytes, tocopy, unused); if (c->ritem != c->rcurr) { memmove(ch->data + ch->used, c->rcurr, tocopy); } @@ -4315,7 +4365,7 @@ static int read_into_chunked_item(conn *c) { /* now try reading from the socket */ res = read(c->sfd, ch->data + ch->used, (unused > c->rlbytes ? c->rlbytes : unused)); - fprintf(stderr, "READ [%d] DATA INTO A CHUNK\n", res); + //fprintf(stderr, "READ [%d] DATA INTO A CHUNK\n", res); if (res > 0) { pthread_mutex_lock(&c->thread->stats.mutex); c->thread->stats.bytes_read += res; @@ -4323,7 +4373,7 @@ static int read_into_chunked_item(conn *c) { ch->used += res; total += res; c->rlbytes -= res; - break; + //break; } else { /* Reset total to the latest result so caller can handle it */ total = res; @@ -4333,7 +4383,7 @@ static int read_into_chunked_item(conn *c) { assert(ch->used <= ch->size); if (ch->size == ch->used) { - fprintf(stderr, "ADVANCING TO NEXT CHUNK\n"); + //fprintf(stderr, "ADVANCING TO NEXT CHUNK\n"); if (ch->next) { c->ritem = (char *) ch->next; } else { @@ -4502,9 +4552,8 @@ static void drive_machine(conn *c) { conn_set_state(c, conn_closing); break; } - /* FIXME: think the binprot gets here without an item set. */ - assert(c->item); - if (( ((item *)c->item)->it_flags & ITEM_CHUNKED) == 0) { + + if (!c->item || (((item *)c->item)->it_flags & ITEM_CHUNKED) == 0) { /* first check if we have leftovers in the conn_read buffer */ if (c->rbytes > 0) { int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes; @@ -4535,6 +4584,7 @@ static void drive_machine(conn *c) { } } else { res = read_into_chunked_item(c); + //fprintf(stderr, "GOT %d RES FROM READ_INTO_CHUNKED_ITEM\n", res); if (res > 0) break; } diff --git a/memcached.h b/memcached.h index 1f44632..cb5705f 100644 --- a/memcached.h +++ b/memcached.h @@ -444,6 +444,7 @@ typedef struct _strchunk { unsigned short refcount; /* used? */ uint8_t nsuffix; /* unused */ uint8_t it_flags; /* ITEM_* above. */ + uint8_t slabs_clsid; /* Same as above. */ char data[]; } item_chunk; @@ -367,10 +367,10 @@ static void do_slabs_free_chunked(item *it, const size_t size, unsigned int id, chunk = chunk->next; } chunk = (item_chunk *) ITEM_data(it); - fprintf(stderr, "FREEING CHUNKED ITEM INTO SLABS: SIZE: [%lu] REALSIZE: [%lu]\n", size, realsize); unsigned int chunks_req = realsize / p->size; if (realsize % p->size != 0) chunks_req++; + fprintf(stderr, "FREEING CHUNKED ITEM INTO SLABS: SIZE: [%lu] REALSIZE: [%lu] CHUNKS_REQ: [%d]\n", size, realsize, chunks_req); it->it_flags = ITEM_SLABBED; it->slabs_clsid = 0; @@ -384,10 +384,15 @@ static void do_slabs_free_chunked(item *it, const size_t size, unsigned int id, for (x = 0; x < chunks_req-1; x++) { chunk->it_flags = ITEM_SLABBED; - chunk = chunk->next; + chunk->slabs_clsid = 0; + if (chunk->next) + chunk = chunk->next; } /* must have had nothing hanging off of the final chunk */ - assert(chunk == 0); + assert(chunk && chunk->next == 0); + /* Tail chunk, link the freelist here. */ + chunk->next = p->slots; + if (chunk->next) chunk->next->prev = chunk; p->slots = it; p->sl_curr += chunks_req; diff --git a/t/chunked-items.t b/t/chunked-items.t new file mode 100644 index 0000000..b375741 --- /dev/null +++ b/t/chunked-items.t @@ -0,0 +1,54 @@ +#!/usr/bin/perl +# Networked logging tests. + +use strict; +use warnings; + +use Test::More; +use FindBin qw($Bin); +use lib "$Bin/lib"; +use MemcachedTest; + +my $server = new_memcached('-m 48'); +my $sock = $server->sock; + +# We're testing to ensure item chaining doesn't corrupt or poorly overlap +# data, so create a non-repeating pattern. +my @parts = (); +for (1 .. 4000) { + push(@parts, $_); +} +my $pattern = join(':', @parts); + +my $plen = length($pattern); +print STDERR "PATTERN LENGTH: $plen\n"; + +print $sock "set pattern 0 0 $plen\r\n$pattern\r\n"; +is(scalar <$sock>, "STORED\r\n", "stored pattern successfully"); + +mem_get_is($sock, "pattern", $pattern); + +for (1..5) { + my $size = 400 * 1024; + my $data = "x" x $size; + print $sock "set foo$_ 0 0 $size\r\n$data\r\n"; + my $res = <$sock>; + is($res, "STORED\r\n", "stored some big items"); +} + +{ + my $max = 1024 * 1024; + my $big = "a big value that's > .5M and < 1M. "; + while (length($big) * 2 < $max) { + $big = $big . $big; + } + my $biglen = length($big); + + for (1..100) { + print $sock "set toast$_ 0 0 $biglen\r\n$big\r\n"; + is(scalar <$sock>, "STORED\r\n", "stored big"); + mem_get_is($sock, "toast$_", $big); + } +} + +done_testing(); |