diff options
author | dormando <dormando@rydia.net> | 2019-10-16 19:23:51 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2020-02-01 14:36:23 -0800 |
commit | 2fb221781508c1c1fd274b709ce446efc5aa1e80 (patch) | |
tree | 9d43b3b0d337269a9fcd0a01532888e1fe80bede | |
parent | 8e59147cba140aa7d592b483806a2a8fadb562a2 (diff) | |
download | memcached-2fb221781508c1c1fd274b709ce446efc5aa1e80.tar.gz |
network: transient static read buffer for conns
instead of 2k and then realloc all over every time you set a large
item, or do large pipelined fetches, use a static slightly larger
buffer.
Idle connections no longer hold a buffer, freeing up a ton of memory.
To maintain compatibility with unbound ASCII multigets, those fall back
to the old malloc/realloc/free routine which it's done since the dark
ages.
-rw-r--r-- | doc/protocol.txt | 2 | ||||
-rw-r--r-- | memcached.c | 101 | ||||
-rw-r--r-- | memcached.h | 7 | ||||
-rwxr-xr-x | t/getset.t | 22 | ||||
-rwxr-xr-x | t/stats.t | 4 | ||||
-rw-r--r-- | thread.c | 9 |
6 files changed, 134 insertions, 11 deletions
diff --git a/doc/protocol.txt b/doc/protocol.txt index 85b92f5..4678e21 100644 --- a/doc/protocol.txt +++ b/doc/protocol.txt @@ -1100,6 +1100,8 @@ integers separated by a colon (treat this as a floating point number). | response_obj_bytes | 64u | Number of bytes used for response objects | | response_obj_total | 64u | Total nummber of response objects | | response_obj_free | 64u | Current free cached response objects | +| read_buf_bytes | 64u | Total read buffer bytes allocated | +| read_buf_bytes_free | 64u | Total read buffer bytes cached for reuse | | reserved_fds | 32u | Number of misc fds used internally | | cmd_get | 64u | Cumulative number of retrieval reqs | | cmd_set | 64u | Cumulative number of storage reqs | diff --git a/memcached.c b/memcached.c index e206dcc..e46d93a 100644 --- a/memcached.c +++ b/memcached.c @@ -412,6 +412,57 @@ int stop_conn_timeout_thread(void) { } /* + * read buffer cache helper functions + */ +static void rbuf_release(conn *c) { + if (c->rbuf != NULL && c->rbytes == 0 && !IS_UDP(c->transport)) { + if (c->rbuf_malloced) { + free(c->rbuf); + c->rbuf_malloced = false; + } else { + do_cache_free(c->thread->rbuf_cache, c->rbuf); + } + c->rsize = 0; + c->rbuf = NULL; + c->rcurr = NULL; + } +} + +static bool rbuf_alloc(conn *c) { + if (c->rbuf == NULL) { + c->rbuf = do_cache_alloc(c->thread->rbuf_cache); + if (!c->rbuf) { + STATS_LOCK(); + stats.malloc_fails++; + STATS_UNLOCK(); + return false; + } + c->rsize = READ_BUFFER_SIZE; + c->rcurr = c->rbuf; + } + return true; +} + +// Just for handling huge ASCII multigets. +// The previous system was essentially the same; realloc'ing until big enough, +// then realloc'ing back down after the request finished. +static bool rbuf_switch_to_malloc(conn *c) { + // Might as well start with x2 and work from there. + size_t size = c->rsize * 2; + char *tmp = malloc(size); + if (!tmp) + return false; + + do_cache_free(c->thread->rbuf_cache, c->rbuf); + memcpy(tmp, c->rcurr, c->rbytes); + + c->rcurr = c->rbuf = tmp; + c->rsize = size; + c->rbuf_malloced = true; + return true; +} + +/* * Initializes the connections array. We don't actually allocate connection * structures until they're needed, so as to avoid wasting memory when the * maximum connection count is much higher than the actual number of @@ -537,13 +588,16 @@ conn *conn_new(const int sfd, enum conn_states init_state, c->read = NULL; c->sendmsg = NULL; c->write = NULL; - c->rbuf = 0; + c->rbuf = NULL; c->rsize = read_buffer_size; - c->rbuf = (char *)malloc((size_t)c->rsize); + // UDP connections use a persistent static buffer. + if (c->rsize) { + c->rbuf = (char *)malloc((size_t)c->rsize); + } - if (c->rbuf == 0) { + if (c->rsize && c->rbuf == NULL) { conn_free(c); STATS_LOCK(); stats.malloc_fails++; @@ -611,6 +665,7 @@ conn *conn_new(const int sfd, enum conn_states init_state, c->rbytes = 0; c->rcurr = c->rbuf; c->ritem = 0; + c->rbuf_malloced = false; c->sasl_started = false; c->set_stale = false; c->mset_res = false; @@ -849,6 +904,12 @@ static void conn_close(conn *c) { conn_cleanup(c); + // force release of read buffer. + if (c->thread) { + c->rbytes = 0; + rbuf_release(c); + } + MEMCACHED_CONN_RELEASE(c->sfd); conn_set_state(c, conn_closed); #ifdef TLS @@ -2995,6 +3056,8 @@ static void server_stats(ADD_STAT add_stats, conn *c) { APPEND_STAT("response_obj_bytes", "%llu", (unsigned long long)thread_stats.response_obj_bytes); APPEND_STAT("response_obj_total", "%llu", (unsigned long long)thread_stats.response_obj_total); APPEND_STAT("response_obj_free", "%llu", (unsigned long long)thread_stats.response_obj_free); + APPEND_STAT("read_buf_bytes", "%llu", (unsigned long long)thread_stats.read_buf_bytes); + APPEND_STAT("read_buf_bytes_free", "%llu", (unsigned long long)thread_stats.read_buf_bytes_free); APPEND_STAT("reserved_fds", "%u", stats_state.reserved_fds); APPEND_STAT("cmd_get", "%llu", (unsigned long long)thread_stats.get_cmds); APPEND_STAT("cmd_set", "%llu", (unsigned long long)slab_stats.set_cmds); @@ -6019,6 +6082,16 @@ static int try_read_command_ascii(conn *c) { conn_set_state(c, conn_closing); return 1; } + + // ASCII multigets are unbound, so our fixed size rbuf may not + // work for this particular workload... For backcompat we'll use a + // malloc/realloc/free routine just for this. + if (!c->rbuf_malloced) { + if (!rbuf_switch_to_malloc(c)) { + conn_set_state(c, conn_closing); + return 1; + } + } } return 0; @@ -6105,7 +6178,8 @@ static enum try_read_result try_read_network(conn *c) { } while (1) { - if (c->rbytes >= c->rsize) { + // TODO: move to rbuf_* func? + if (c->rbytes >= c->rsize && c->rbuf_malloced) { if (num_allocs == 4) { return gotdata; } @@ -6135,7 +6209,8 @@ static enum try_read_result try_read_network(conn *c) { pthread_mutex_unlock(&c->thread->stats.mutex); gotdata = READ_DATA_RECEIVED; c->rbytes += res; - if (res == avail) { + if (res == avail && c->rbuf_malloced) { + // Resize rbuf and try a few times if huge ascii multiget. continue; } else { break; @@ -6727,13 +6802,14 @@ static void drive_machine(conn *c) { #endif dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, - DATA_BUFFER_SIZE, c->transport, ssl_v); + READ_BUFFER_CACHED, c->transport, ssl_v); } stop = true; break; case conn_waiting: + rbuf_release(c); if (!update_event(c, EV_READ | EV_PERSIST)) { if (settings.verbose > 0) fprintf(stderr, "Couldn't update event\n"); @@ -6746,7 +6822,18 @@ static void drive_machine(conn *c) { break; case conn_read: - res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c); + if (!IS_UDP(c->transport)) { + // Assign a read buffer if necessary. + if (!rbuf_alloc(c)) { + // TODO: Some way to allow for temporary failures. + conn_set_state(c, conn_closing); + break; + } + res = try_read_network(c); + } else { + // UDP connections always have a static buffer. + res = try_read_udp(c); + } switch (res) { case READ_NO_DATA_RECEIVED: diff --git a/memcached.h b/memcached.h index a445ed3..fd7a136 100644 --- a/memcached.h +++ b/memcached.h @@ -61,8 +61,9 @@ /** Size of an incr buf. */ #define INCR_MAX_STORAGE_LEN 24 -#define DATA_BUFFER_SIZE 2048 #define WRITE_BUFFER_SIZE 1024 +#define READ_BUFFER_SIZE 16384 +#define READ_BUFFER_CACHED 0 #define UDP_READ_BUFFER_SIZE 65536 #define UDP_MAX_PAYLOAD_SIZE 1400 #define UDP_HEADER_SIZE 8 @@ -321,6 +322,8 @@ struct thread_stats { uint64_t response_obj_bytes; uint64_t response_obj_total; uint64_t response_obj_free; + uint64_t read_buf_bytes; + uint64_t read_buf_bytes_free; }; /** @@ -593,6 +596,7 @@ typedef struct { struct thread_stats stats; /* Stats generated by this thread */ struct conn_queue *new_conn_queue; /* queue of new connections to handle */ cache_t *resp_cache; /* response objects */ + cache_t *rbuf_cache; /* static-sized read buffers */ #ifdef EXTSTORE cache_t *io_cache; /* IO objects */ void *storage; /* data object for storage system */ @@ -662,6 +666,7 @@ struct conn { bool set_stale; bool mset_res; /** uses mset format for return code */ bool close_after_write; /** flush write then move to close connection */ + bool rbuf_malloced; /** read buffer was malloc'ed for ascii mget, needs free() */ #ifdef TLS SSL *ssl; char *ssl_wbuf; @@ -1,7 +1,7 @@ #!/usr/bin/perl use strict; -use Test::More tests => 539; +use Test::More tests => 37991; use FindBin qw($Bin); use lib "$Bin/lib"; use MemcachedTest; @@ -97,3 +97,23 @@ while ($len < 1024*1028) { $len += 2048; } +{ + # Test large ASCII multigets. + print $sock "set foobarbaz 0 0 2\r\nhi\r\n"; + is(scalar <$sock>, "STORED\r\n", "base foo stored"); + + my $len = 1024 * 128; + my $klist = ''; + my $kcount = 0; + for (my $x = 0; $x < $len; $x += 7) { + $klist .= "foobarbaz "; + $kcount++; + } + print $sock "get $klist\r\n"; + + while ($kcount--) { + is(scalar <$sock>, "VALUE foobarbaz 0 2\r\n", "foo returned"); + is(scalar <$sock>, "hi\r\n", "foo 'hi' returned"); + } + is(scalar <$sock>, "END\r\n", "foo END seen"); +} @@ -26,9 +26,9 @@ my $stats = mem_stats($sock); # Test number of keys if (MemcachedTest::enabled_tls_testing()) { # when TLS is enabled, stats contains time_since_server_cert_refresh - is(scalar(keys(%$stats)), 75, "expected count of stats values"); + is(scalar(keys(%$stats)), 77, "expected count of stats values"); } else { - is(scalar(keys(%$stats)), 74, "expected count of stats values"); + is(scalar(keys(%$stats)), 76, "expected count of stats values"); } # Test initial state @@ -411,6 +411,13 @@ static void setup_thread(LIBEVENT_THREAD *me) { fprintf(stderr, "Failed to create response cache\n"); exit(EXIT_FAILURE); } + + me->rbuf_cache = cache_create("rbuf", READ_BUFFER_SIZE, sizeof(char *), NULL, NULL); + if (me->rbuf_cache == NULL) { + fprintf(stderr, "Failed to create read buffer cache\n"); + exit(EXIT_FAILURE); + } + #ifdef EXTSTORE me->io_cache = cache_create("io", sizeof(io_wrap), sizeof(char*), NULL, NULL); if (me->io_cache == NULL) { @@ -805,6 +812,8 @@ void threadlocal_stats_aggregate(struct thread_stats *stats) { stats->response_obj_bytes += threads[ii].resp_cache->total * sizeof(mc_resp); stats->response_obj_total += threads[ii].resp_cache->total; stats->response_obj_free += threads[ii].resp_cache->freecurr; + stats->read_buf_bytes += threads[ii].rbuf_cache->total * READ_BUFFER_SIZE; + stats->read_buf_bytes_free += threads[ii].rbuf_cache->freecurr * READ_BUFFER_SIZE; pthread_mutex_unlock(&threads[ii].stats.mutex); } } |