summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/protocol.txt2
-rw-r--r--memcached.c101
-rw-r--r--memcached.h7
-rwxr-xr-xt/getset.t22
-rwxr-xr-xt/stats.t4
-rw-r--r--thread.c9
6 files changed, 134 insertions, 11 deletions
diff --git a/doc/protocol.txt b/doc/protocol.txt
index 85b92f5..4678e21 100644
--- a/doc/protocol.txt
+++ b/doc/protocol.txt
@@ -1100,6 +1100,8 @@ integers separated by a colon (treat this as a floating point number).
| response_obj_bytes | 64u | Number of bytes used for response objects |
| response_obj_total | 64u | Total nummber of response objects |
| response_obj_free | 64u | Current free cached response objects |
+| read_buf_bytes | 64u | Total read buffer bytes allocated |
+| read_buf_bytes_free | 64u | Total read buffer bytes cached for reuse |
| reserved_fds | 32u | Number of misc fds used internally |
| cmd_get | 64u | Cumulative number of retrieval reqs |
| cmd_set | 64u | Cumulative number of storage reqs |
diff --git a/memcached.c b/memcached.c
index e206dcc..e46d93a 100644
--- a/memcached.c
+++ b/memcached.c
@@ -412,6 +412,57 @@ int stop_conn_timeout_thread(void) {
}
/*
+ * read buffer cache helper functions
+ */
+static void rbuf_release(conn *c) {
+ if (c->rbuf != NULL && c->rbytes == 0 && !IS_UDP(c->transport)) {
+ if (c->rbuf_malloced) {
+ free(c->rbuf);
+ c->rbuf_malloced = false;
+ } else {
+ do_cache_free(c->thread->rbuf_cache, c->rbuf);
+ }
+ c->rsize = 0;
+ c->rbuf = NULL;
+ c->rcurr = NULL;
+ }
+}
+
+static bool rbuf_alloc(conn *c) {
+ if (c->rbuf == NULL) {
+ c->rbuf = do_cache_alloc(c->thread->rbuf_cache);
+ if (!c->rbuf) {
+ STATS_LOCK();
+ stats.malloc_fails++;
+ STATS_UNLOCK();
+ return false;
+ }
+ c->rsize = READ_BUFFER_SIZE;
+ c->rcurr = c->rbuf;
+ }
+ return true;
+}
+
+// Just for handling huge ASCII multigets.
+// The previous system was essentially the same; realloc'ing until big enough,
+// then realloc'ing back down after the request finished.
+static bool rbuf_switch_to_malloc(conn *c) {
+ // Might as well start with x2 and work from there.
+ size_t size = c->rsize * 2;
+ char *tmp = malloc(size);
+ if (!tmp)
+ return false;
+
+ do_cache_free(c->thread->rbuf_cache, c->rbuf);
+ memcpy(tmp, c->rcurr, c->rbytes);
+
+ c->rcurr = c->rbuf = tmp;
+ c->rsize = size;
+ c->rbuf_malloced = true;
+ return true;
+}
+
+/*
* Initializes the connections array. We don't actually allocate connection
* structures until they're needed, so as to avoid wasting memory when the
* maximum connection count is much higher than the actual number of
@@ -537,13 +588,16 @@ conn *conn_new(const int sfd, enum conn_states init_state,
c->read = NULL;
c->sendmsg = NULL;
c->write = NULL;
- c->rbuf = 0;
+ c->rbuf = NULL;
c->rsize = read_buffer_size;
- c->rbuf = (char *)malloc((size_t)c->rsize);
+ // UDP connections use a persistent static buffer.
+ if (c->rsize) {
+ c->rbuf = (char *)malloc((size_t)c->rsize);
+ }
- if (c->rbuf == 0) {
+ if (c->rsize && c->rbuf == NULL) {
conn_free(c);
STATS_LOCK();
stats.malloc_fails++;
@@ -611,6 +665,7 @@ conn *conn_new(const int sfd, enum conn_states init_state,
c->rbytes = 0;
c->rcurr = c->rbuf;
c->ritem = 0;
+ c->rbuf_malloced = false;
c->sasl_started = false;
c->set_stale = false;
c->mset_res = false;
@@ -849,6 +904,12 @@ static void conn_close(conn *c) {
conn_cleanup(c);
+ // force release of read buffer.
+ if (c->thread) {
+ c->rbytes = 0;
+ rbuf_release(c);
+ }
+
MEMCACHED_CONN_RELEASE(c->sfd);
conn_set_state(c, conn_closed);
#ifdef TLS
@@ -2995,6 +3056,8 @@ static void server_stats(ADD_STAT add_stats, conn *c) {
APPEND_STAT("response_obj_bytes", "%llu", (unsigned long long)thread_stats.response_obj_bytes);
APPEND_STAT("response_obj_total", "%llu", (unsigned long long)thread_stats.response_obj_total);
APPEND_STAT("response_obj_free", "%llu", (unsigned long long)thread_stats.response_obj_free);
+ APPEND_STAT("read_buf_bytes", "%llu", (unsigned long long)thread_stats.read_buf_bytes);
+ APPEND_STAT("read_buf_bytes_free", "%llu", (unsigned long long)thread_stats.read_buf_bytes_free);
APPEND_STAT("reserved_fds", "%u", stats_state.reserved_fds);
APPEND_STAT("cmd_get", "%llu", (unsigned long long)thread_stats.get_cmds);
APPEND_STAT("cmd_set", "%llu", (unsigned long long)slab_stats.set_cmds);
@@ -6019,6 +6082,16 @@ static int try_read_command_ascii(conn *c) {
conn_set_state(c, conn_closing);
return 1;
}
+
+ // ASCII multigets are unbound, so our fixed size rbuf may not
+ // work for this particular workload... For backcompat we'll use a
+ // malloc/realloc/free routine just for this.
+ if (!c->rbuf_malloced) {
+ if (!rbuf_switch_to_malloc(c)) {
+ conn_set_state(c, conn_closing);
+ return 1;
+ }
+ }
}
return 0;
@@ -6105,7 +6178,8 @@ static enum try_read_result try_read_network(conn *c) {
}
while (1) {
- if (c->rbytes >= c->rsize) {
+ // TODO: move to rbuf_* func?
+ if (c->rbytes >= c->rsize && c->rbuf_malloced) {
if (num_allocs == 4) {
return gotdata;
}
@@ -6135,7 +6209,8 @@ static enum try_read_result try_read_network(conn *c) {
pthread_mutex_unlock(&c->thread->stats.mutex);
gotdata = READ_DATA_RECEIVED;
c->rbytes += res;
- if (res == avail) {
+ if (res == avail && c->rbuf_malloced) {
+ // Resize rbuf and try a few times if huge ascii multiget.
continue;
} else {
break;
@@ -6727,13 +6802,14 @@ static void drive_machine(conn *c) {
#endif
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
- DATA_BUFFER_SIZE, c->transport, ssl_v);
+ READ_BUFFER_CACHED, c->transport, ssl_v);
}
stop = true;
break;
case conn_waiting:
+ rbuf_release(c);
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
@@ -6746,7 +6822,18 @@ static void drive_machine(conn *c) {
break;
case conn_read:
- res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
+ if (!IS_UDP(c->transport)) {
+ // Assign a read buffer if necessary.
+ if (!rbuf_alloc(c)) {
+ // TODO: Some way to allow for temporary failures.
+ conn_set_state(c, conn_closing);
+ break;
+ }
+ res = try_read_network(c);
+ } else {
+ // UDP connections always have a static buffer.
+ res = try_read_udp(c);
+ }
switch (res) {
case READ_NO_DATA_RECEIVED:
diff --git a/memcached.h b/memcached.h
index a445ed3..fd7a136 100644
--- a/memcached.h
+++ b/memcached.h
@@ -61,8 +61,9 @@
/** Size of an incr buf. */
#define INCR_MAX_STORAGE_LEN 24
-#define DATA_BUFFER_SIZE 2048
#define WRITE_BUFFER_SIZE 1024
+#define READ_BUFFER_SIZE 16384
+#define READ_BUFFER_CACHED 0
#define UDP_READ_BUFFER_SIZE 65536
#define UDP_MAX_PAYLOAD_SIZE 1400
#define UDP_HEADER_SIZE 8
@@ -321,6 +322,8 @@ struct thread_stats {
uint64_t response_obj_bytes;
uint64_t response_obj_total;
uint64_t response_obj_free;
+ uint64_t read_buf_bytes;
+ uint64_t read_buf_bytes_free;
};
/**
@@ -593,6 +596,7 @@ typedef struct {
struct thread_stats stats; /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *resp_cache; /* response objects */
+ cache_t *rbuf_cache; /* static-sized read buffers */
#ifdef EXTSTORE
cache_t *io_cache; /* IO objects */
void *storage; /* data object for storage system */
@@ -662,6 +666,7 @@ struct conn {
bool set_stale;
bool mset_res; /** uses mset format for return code */
bool close_after_write; /** flush write then move to close connection */
+ bool rbuf_malloced; /** read buffer was malloc'ed for ascii mget, needs free() */
#ifdef TLS
SSL *ssl;
char *ssl_wbuf;
diff --git a/t/getset.t b/t/getset.t
index 303a924..bdfd4bf 100755
--- a/t/getset.t
+++ b/t/getset.t
@@ -1,7 +1,7 @@
#!/usr/bin/perl
use strict;
-use Test::More tests => 539;
+use Test::More tests => 37991;
use FindBin qw($Bin);
use lib "$Bin/lib";
use MemcachedTest;
@@ -97,3 +97,23 @@ while ($len < 1024*1028) {
$len += 2048;
}
+{
+ # Test large ASCII multigets.
+ print $sock "set foobarbaz 0 0 2\r\nhi\r\n";
+ is(scalar <$sock>, "STORED\r\n", "base foo stored");
+
+ my $len = 1024 * 128;
+ my $klist = '';
+ my $kcount = 0;
+ for (my $x = 0; $x < $len; $x += 7) {
+ $klist .= "foobarbaz ";
+ $kcount++;
+ }
+ print $sock "get $klist\r\n";
+
+ while ($kcount--) {
+ is(scalar <$sock>, "VALUE foobarbaz 0 2\r\n", "foo returned");
+ is(scalar <$sock>, "hi\r\n", "foo 'hi' returned");
+ }
+ is(scalar <$sock>, "END\r\n", "foo END seen");
+}
diff --git a/t/stats.t b/t/stats.t
index 0624df6..39c9228 100755
--- a/t/stats.t
+++ b/t/stats.t
@@ -26,9 +26,9 @@ my $stats = mem_stats($sock);
# Test number of keys
if (MemcachedTest::enabled_tls_testing()) {
# when TLS is enabled, stats contains time_since_server_cert_refresh
- is(scalar(keys(%$stats)), 75, "expected count of stats values");
+ is(scalar(keys(%$stats)), 77, "expected count of stats values");
} else {
- is(scalar(keys(%$stats)), 74, "expected count of stats values");
+ is(scalar(keys(%$stats)), 76, "expected count of stats values");
}
# Test initial state
diff --git a/thread.c b/thread.c
index 31a2a5d..2b91f3e 100644
--- a/thread.c
+++ b/thread.c
@@ -411,6 +411,13 @@ static void setup_thread(LIBEVENT_THREAD *me) {
fprintf(stderr, "Failed to create response cache\n");
exit(EXIT_FAILURE);
}
+
+ me->rbuf_cache = cache_create("rbuf", READ_BUFFER_SIZE, sizeof(char *), NULL, NULL);
+ if (me->rbuf_cache == NULL) {
+ fprintf(stderr, "Failed to create read buffer cache\n");
+ exit(EXIT_FAILURE);
+ }
+
#ifdef EXTSTORE
me->io_cache = cache_create("io", sizeof(io_wrap), sizeof(char*), NULL, NULL);
if (me->io_cache == NULL) {
@@ -805,6 +812,8 @@ void threadlocal_stats_aggregate(struct thread_stats *stats) {
stats->response_obj_bytes += threads[ii].resp_cache->total * sizeof(mc_resp);
stats->response_obj_total += threads[ii].resp_cache->total;
stats->response_obj_free += threads[ii].resp_cache->freecurr;
+ stats->read_buf_bytes += threads[ii].rbuf_cache->total * READ_BUFFER_SIZE;
+ stats->read_buf_bytes_free += threads[ii].rbuf_cache->freecurr * READ_BUFFER_SIZE;
pthread_mutex_unlock(&threads[ii].stats.mutex);
}
}