diff options
-rw-r--r-- | memcached.c | 115 | ||||
-rw-r--r-- | memcached.h | 12 | ||||
-rw-r--r-- | sizes.c | 3 |
3 files changed, 125 insertions, 5 deletions
diff --git a/memcached.c b/memcached.c index 8b5ee89..adb323c 100644 --- a/memcached.c +++ b/memcached.c @@ -1036,16 +1036,122 @@ static void resp_add_chunked_iov(mc_resp *resp, const void *buf, int len) { resp_add_iov(resp, buf, len); } +// resp_allocate and resp_free are a wrapper around read buffers which makes +// read buffers the only network memory to track. +// Normally this would be too excessive. In this case it allows end users to +// track a single memory limit for ephemeral connection buffers. +// Fancy bit twiddling tricks are avoided to help keep this straightforward. +static mc_resp* resp_allocate(conn *c) { + LIBEVENT_THREAD *th = c->thread; + mc_resp *resp = NULL; + mc_resp_bundle *b = th->open_bundle; + + if (b != NULL) { + for (int i = 0; i < MAX_RESP_PER_BUNDLE; i++) { + // loop around starting from the most likely to be free + int x = (i + b->next_check) % MAX_RESP_PER_BUNDLE; + if (b->r[x].free) { + resp = &b->r[x]; + b->next_check = x+1; + break; + } + } + + if (resp != NULL) { + b->refcount++; + resp->free = false; + if (b->refcount == MAX_RESP_PER_BUNDLE) { + assert(b->next == NULL); + // We only allocate off the head. Assign new head. + th->open_bundle = b->prev; + // Remove ourselves from the list. + if (b->prev) { + b->prev->next = 0; + } + } + } + } + + if (resp == NULL) { + assert(th->open_bundle == NULL); + b = do_cache_alloc(th->rbuf_cache); + if (b) { + b->next_check = 1; + b->refcount = 1; + for (int i = 0; i < MAX_RESP_PER_BUNDLE; i++) { + b->r[i].bundle = b; + b->r[i].free = true; + } + b->next = 0; + b->prev = 0; + th->open_bundle = b; + resp = &b->r[0]; + } else { + return NULL; + } + } + + return resp; +} + +static void resp_free(conn *c, mc_resp *resp) { + LIBEVENT_THREAD *th = c->thread; + mc_resp_bundle *b = resp->bundle; + + resp->free = true; + b->refcount--; + if (b->refcount == 0) { + if (b == th->open_bundle && b->next == 0) { + // This is the final bundle. Just hold and reuse to skip init loop + assert(b->prev == 0); + b->next_check = 0; + } else { + // Assert that we're either in the list or at the head. + assert((b->next || b->prev) || b == th->open_bundle); + + // unlink from list. + mc_resp_bundle **head = &th->open_bundle; + if (*head == b) *head = b->next; + // Not tracking the tail. + assert(b->next != b && b->prev != b); + + if (b->next) b->next->prev = b->prev; + if (b->prev) b->prev->next = b->next; + + // Now completely done with this buffer. + do_cache_free(th->rbuf_cache, b); + } + } else { + mc_resp_bundle **head = &th->open_bundle; + // NOTE: since we're not tracking tail, latest free ends up in head. + if (b == th->open_bundle || (b->prev || b->next)) { + // If we're already linked, leave it in place to save CPU. + } else { + // Non-zero refcount, need to link into the freelist. + b->prev = 0; + b->next = *head; + if (b->next) b->next->prev = b; + *head = b; + } + + } +} + static bool resp_start(conn *c) { - mc_resp *resp = do_cache_alloc(c->thread->resp_cache); + mc_resp *resp = resp_allocate(c); if (!resp) { THR_STATS_LOCK(c); c->thread->stats.response_obj_oom++; THR_STATS_UNLOCK(c); return false; } - // FIXME: make wbuf indirect or use offsetof to zero up until wbuf - memset(resp, 0, sizeof(*resp)); + // Skip zeroing the bundle pointer at the start. + memset((char *)resp + sizeof(mc_resp_bundle*), 0, sizeof(*resp) - sizeof(mc_resp_bundle*)); + // TODO: this next line works. memset _does_ show up significantly under + // perf reports due to zeroing out the entire resp->wbuf. before swapping + // the lines more validation work should be done to ensure wbuf's aren't + // accidentally reused without being written to. + //memset((char *)resp + sizeof(mc_resp_bundle*), 0, offsetof(mc_resp, wbuf)); if (!c->resp_head) { c->resp_head = resp; } @@ -1081,8 +1187,7 @@ static mc_resp* resp_finish(conn *c, mc_resp *resp) { if (c->resp == resp) { c->resp = NULL; } - resp->free = true; - do_cache_free(c->thread->resp_cache, resp); + resp_free(c, resp); return next; } diff --git a/memcached.h b/memcached.h index 3254afa..ba944ef 100644 --- a/memcached.h +++ b/memcached.h @@ -602,6 +602,7 @@ typedef struct { unsigned short page_id; /* from IO header */ } item_hdr; #endif +typedef struct _mc_resp_bundle mc_resp_bundle; typedef struct { pthread_t thread_id; /* unique ID of this thread */ struct event_base *base; /* libevent handle this thread uses */ @@ -612,6 +613,7 @@ typedef struct { struct conn_queue *new_conn_queue; /* queue of new connections to handle */ cache_t *resp_cache; /* response objects */ cache_t *rbuf_cache; /* static-sized read buffers */ + mc_resp_bundle *open_bundle; #ifdef EXTSTORE cache_t *io_cache; /* IO objects */ void *storage; /* data object for storage system */ @@ -629,6 +631,7 @@ typedef struct { */ #define MC_RESP_IOVCOUNT 4 typedef struct _mc_resp { + mc_resp_bundle *bundle; // ptr back to bundle struct _mc_resp *next; // choo choo. int wbytes; // bytes to write out of wbuf: might be able to nuke this. int tosend; // total bytes to send for this response @@ -655,6 +658,15 @@ typedef struct _mc_resp { char wbuf[WRITE_BUFFER_SIZE]; } mc_resp; +#define MAX_RESP_PER_BUNDLE ((READ_BUFFER_SIZE - sizeof(mc_resp_bundle)) / sizeof(mc_resp)) +struct _mc_resp_bundle { + uint8_t refcount; + uint8_t next_check; // next object to check on assignment. + struct _mc_resp_bundle *next; + struct _mc_resp_bundle *prev; + mc_resp r[]; +}; + typedef struct conn conn; #ifdef EXTSTORE typedef struct _io_wrap { @@ -22,6 +22,9 @@ int main(int argc, char **argv) { display("Libevent thread", sizeof(LIBEVENT_THREAD) - sizeof(struct thread_stats)); display("Connection", sizeof(conn)); + display("Response object", sizeof(mc_resp)); + display("Response bundle", sizeof(mc_resp_bundle)); + display("Response objects per bundle", MAX_RESP_PER_BUNDLE); printf("----------------------------------------\n"); |