diff options
author | dormando <dormando@rydia.net> | 2016-07-30 12:42:23 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2016-08-19 17:34:55 -0700 |
commit | a8347f6d0913a3cf22d30400b66bfcedf610ebf9 (patch) | |
tree | 1ede0116490e8567bfd35ccf9382d23829620a0b | |
parent | c793bae62ebbf9b0c2b0663fcbd33684262e3062 (diff) | |
download | memcached-a8347f6d0913a3cf22d30400b66bfcedf610ebf9.tar.gz |
prototype functionality for LRU metadumper
Functionality is nearly all there. A handful of FIXME's and TODO's to address.
From there it needs to be refactored into something proper.
-rw-r--r-- | items.c | 195 | ||||
-rw-r--r-- | items.h | 3 | ||||
-rw-r--r-- | memcached.c | 33 | ||||
-rw-r--r-- | memcached.h | 9 | ||||
-rw-r--r-- | util.c | 35 | ||||
-rw-r--r-- | util.h | 4 |
6 files changed, 260 insertions, 19 deletions
@@ -13,6 +13,7 @@ #include <time.h> #include <assert.h> #include <unistd.h> +#include <poll.h> /* Forward Declarations */ static void item_link_q(item *it); @@ -56,9 +57,17 @@ typedef struct { bool run_complete; } crawlerstats_t; +typedef struct { + void *c; /* original connection structure. still with source thread attached. */ + int sfd; /* client fd. */ + bipbuf_t *buf; /* output buffer */ + char *cbuf; /* current buffer */ +} crawler_client_t; + static item *heads[LARGEST_ID]; static item *tails[LARGEST_ID]; static crawler crawlers[LARGEST_ID]; +static crawler_client_t crawler_client; static itemstats_t itemstats[LARGEST_ID]; static unsigned int sizes[LARGEST_ID]; static uint64_t sizes_bytes[LARGEST_ID]; @@ -91,7 +100,7 @@ void item_stats_reset(void) { static int lru_pull_tail(const int orig_id, const int cur_lru, const uint64_t total_bytes, const bool do_evict); -static int lru_crawler_start(uint32_t id, uint32_t remaining); +static int lru_crawler_start(uint32_t id, uint32_t remaining, const enum crawler_run_type type); /* Get the next CAS id for a new item. */ /* TODO: refactor some atomics for this. */ @@ -1050,7 +1059,7 @@ static void lru_maintainer_crawler_check(void) { crawlerstats_t *s = &crawlerstats[i]; /* We've not successfully kicked off a crawl yet. */ if (last_crawls[i] == 0) { - if (lru_crawler_start(i, 0) > 0) { + if (lru_crawler_start(i, 0, CRAWLER_EXPIRED) > 0) { last_crawls[i] = current_time; } } @@ -1336,6 +1345,123 @@ static void item_crawler_evaluate(item *search, uint32_t hv, int i) { } } +static void item_crawler_metadump(item *it, uint32_t hv, int i) { + //int slab_id = CLEAR_LRU(i); + char keybuf[KEY_MAX_LENGTH * 3 + 1]; + int is_flushed = item_is_flushed(it); + /* Ignore expired content. */ + if ((it->exptime != 0 && it->exptime < current_time) + || is_flushed) { + refcount_decr(&it->refcount); + return; + } + // TODO: uriencode directly into the buffer. + uriencode(ITEM_key(it), keybuf, it->nkey, KEY_MAX_LENGTH * 3 + 1); + int total = snprintf(crawler_client.cbuf, 4096, + "key=%s exp=%ld la=%u cas=%llu fetch=%s\n", + keybuf, + (it->exptime == 0) ? -1 : (long)it->exptime + process_started, + it->time, + (unsigned long long)ITEM_get_cas(it), + (it->it_flags & ITEM_FETCHED) ? "yes" : "no"); + refcount_decr(&it->refcount); + // TODO: some way of tracking the errors. these are very unlikely though. + if (total >= 4096 || total <= 0) { + /* Failed to write, don't push it. */ + return; + } + bipbuf_push(crawler_client.buf, total); +} + +static void item_crawler_close_client(crawler_client_t *c) { + fprintf(stderr, "CRAWLER: Closing client\n"); + sidethread_conn_close(c->c); + c->c = NULL; + c->cbuf = NULL; + bipbuf_free(c->buf); + c->buf = NULL; +} + +static int item_crawler_metadump_poll(crawler_client_t *c) { + unsigned char *data; + unsigned int data_size = 0; + struct pollfd to_poll[1]; + to_poll[0].fd = c->sfd; + to_poll[0].events = POLLOUT; + + int ret = poll(to_poll, 1, 1000); + + if (ret < 0) { + // fatal. + return -1; + } + + if (ret == 0) return 0; + + if (to_poll[0].revents & POLLIN) { + char buf[1]; + int res = read(c->sfd, buf, 1); + if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) { + item_crawler_close_client(c); + return -1; + } + } + if ((data = bipbuf_peek_all(c->buf, &data_size)) != NULL) { + if (to_poll[0].revents & (POLLHUP|POLLERR)) { + item_crawler_close_client(c); + return -1; + } else if (to_poll[0].revents & POLLOUT) { + int total = write(c->sfd, data, data_size); + if (total == -1) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + item_crawler_close_client(c); + return -1; + } + } else if (total == 0) { + item_crawler_close_client(c); + return -1; + } else { + bipbuf_poll(c->buf, total); + } + } + } + return 0; +} + +/* Grab some space to work with, if none exists, run the poll() loop and wait + * for it to clear up or close. + * Return NULL if closed. + */ +static int item_crawler_metadump_getbuf(crawler_client_t *c) { + void *buf = NULL; + if (c->c == NULL) return -1; + /* not enough space. */ + while ((buf = bipbuf_request(c->buf, 4096)) == NULL) { + // TODO: max loops before closing. + int ret = item_crawler_metadump_poll(c); + if (ret < 0) return ret; + } + + c->cbuf = buf; + return 0; +} + +static void item_crawler_class_done(int i) { + crawlers[i].it_flags = 0; + crawler_count--; + crawler_unlink_q((item *)&crawlers[i]); + pthread_mutex_unlock(&lru_locks[i]); + pthread_mutex_lock(&lru_crawler_stats_lock); + crawlerstats[CLEAR_LRU(i)].end_time = current_time; + crawlerstats[CLEAR_LRU(i)].run_complete = true; + pthread_mutex_unlock(&lru_crawler_stats_lock); + + if (crawlers[i].type == CRAWLER_METADUMP && crawler_client.c != NULL) { + item_crawler_metadump_poll(&crawler_client); + item_crawler_close_client(&crawler_client); + } +} + static void *item_crawler_thread(void *arg) { int i; int crawls_persleep = settings.crawls_persleep; @@ -1356,20 +1482,23 @@ static void *item_crawler_thread(void *arg) { if (crawlers[i].it_flags != 1) { continue; } + + /* Get memory from bipbuf, if client has no space, flush. */ + // TODO: Will become a callback run here. + if (crawlers[i].type == CRAWLER_METADUMP) { + int ret = item_crawler_metadump_getbuf(&crawler_client); + if (ret != 0) { + item_crawler_class_done(i); + continue; + } + } pthread_mutex_lock(&lru_locks[i]); search = crawler_crawl_q((item *)&crawlers[i]); if (search == NULL || (crawlers[i].remaining && --crawlers[i].remaining < 1)) { if (settings.verbose > 2) fprintf(stderr, "Nothing left to crawl for %d\n", i); - crawlers[i].it_flags = 0; - crawler_count--; - crawler_unlink_q((item *)&crawlers[i]); - pthread_mutex_unlock(&lru_locks[i]); - pthread_mutex_lock(&lru_crawler_stats_lock); - crawlerstats[CLEAR_LRU(i)].end_time = current_time; - crawlerstats[CLEAR_LRU(i)].run_complete = true; - pthread_mutex_unlock(&lru_crawler_stats_lock); + item_crawler_class_done(i); continue; } uint32_t hv = hash(ITEM_key(search), search->nkey); @@ -1392,13 +1521,17 @@ static void *item_crawler_thread(void *arg) { /* Frees the item or decrements the refcount. */ /* Interface for this could improve: do the free/decr here * instead? */ + if (crawlers[i].type == CRAWLER_METADUMP) + pthread_mutex_unlock(&lru_locks[i]); + pthread_mutex_lock(&lru_crawler_stats_lock); - item_crawler_evaluate(search, hv, i); + crawlers[i].eval(search, hv, i); pthread_mutex_unlock(&lru_crawler_stats_lock); if (hold_lock) item_trylock_unlock(hold_lock); - pthread_mutex_unlock(&lru_locks[i]); + if (crawlers[i].type != CRAWLER_METADUMP) + pthread_mutex_unlock(&lru_locks[i]); if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) { usleep(settings.lru_crawler_sleep); @@ -1470,7 +1603,7 @@ int start_item_crawler_thread(void) { /* 'remaining' is passed in so the LRU maintainer thread can scrub the whole * LRU every time. */ -static int do_lru_crawler_start(uint32_t id, uint32_t remaining) { +static int do_lru_crawler_start(uint32_t id, uint32_t remaining, enum crawler_run_type type) { int i; uint32_t sid; uint32_t tocrawl[3]; @@ -1493,6 +1626,16 @@ static int do_lru_crawler_start(uint32_t id, uint32_t remaining) { crawlers[sid].time = 0; crawlers[sid].remaining = remaining; crawlers[sid].slabs_clsid = sid; + crawlers[sid].type = type; + switch (type) { + case CRAWLER_METADUMP: + crawlers[sid].eval = item_crawler_metadump; + break; + case CRAWLER_EXPIRED: + default: + crawlers[sid].eval = item_crawler_evaluate; + break; + } crawler_link_q((item *)&crawlers[sid]); crawler_count++; starts++; @@ -1512,12 +1655,12 @@ static int do_lru_crawler_start(uint32_t id, uint32_t remaining) { return starts; } -static int lru_crawler_start(uint32_t id, uint32_t remaining) { +static int lru_crawler_start(uint32_t id, uint32_t remaining, const enum crawler_run_type type) { int starts; if (pthread_mutex_trylock(&lru_crawler_lock) != 0) { return 0; } - starts = do_lru_crawler_start(id, remaining); + starts = do_lru_crawler_start(id, remaining, type); if (starts) { pthread_cond_signal(&lru_crawler_cond); } @@ -1525,12 +1668,30 @@ static int lru_crawler_start(uint32_t id, uint32_t remaining) { return starts; } +/* FIXME: Temporary hack since we can't yet pass this information into + * lru_crawler_crawl.. which has the proper locks/etc. + * Multiple parallel commands could race, but isn't part of the testing. + */ +int lru_crawler_set_client(void *c, const int sfd) { + if (crawler_client.c != NULL) { + return -1; + } + crawler_client.c = c; + crawler_client.sfd = sfd; + + crawler_client.buf = bipbuf_new(1024 * 128); + if (crawler_client.buf == NULL) { + return -2; + } + return 0; +} + /* FIXME: Split this into two functions: one to kick a crawler for a sid, and one to * parse the string. LRU maintainer code is generating a string to set up a * sid. * Also only clear the crawlerstats once per sid. */ -enum crawler_result_type lru_crawler_crawl(char *slabs) { +enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_type type) { char *b = NULL; uint32_t sid = 0; int starts = 0; @@ -1561,7 +1722,7 @@ enum crawler_result_type lru_crawler_crawl(char *slabs) { for (sid = POWER_SMALLEST; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) { if (tocrawl[sid]) - starts += do_lru_crawler_start(sid, settings.lru_crawler_tocrawl); + starts += do_lru_crawler_start(sid, settings.lru_crawler_tocrawl, type); } if (starts) { pthread_cond_signal(&lru_crawler_cond); @@ -47,6 +47,7 @@ void lru_maintainer_resume(void); int start_item_crawler_thread(void); int stop_item_crawler_thread(void); int init_lru_crawler(void); -enum crawler_result_type lru_crawler_crawl(char *slabs); +enum crawler_result_type lru_crawler_crawl(char *slabs, enum crawler_run_type); +int lru_crawler_set_client(void *c, const int sfd); /* FIXME: Temporary. */ void lru_crawler_pause(void); void lru_crawler_resume(void); diff --git a/memcached.c b/memcached.c index 406180e..2cf849c 100644 --- a/memcached.c +++ b/memcached.c @@ -3964,7 +3964,7 @@ static void process_command(conn *c, char *command) { return; } - rv = lru_crawler_crawl(tokens[2].value); + rv = lru_crawler_crawl(tokens[2].value, CRAWLER_EXPIRED); switch(rv) { case CRAWLER_OK: out_string(c, "OK"); @@ -3980,6 +3980,33 @@ static void process_command(conn *c, char *command) { break; } return; + } else if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN + 1].value, "metadump") == 0) { + if (settings.lru_crawler == false) { + out_string(c, "CLIENT_ERROR lru crawler disabled"); + return; + } + + // FIXME: check response code. + lru_crawler_set_client(c, c->sfd); + int rv = lru_crawler_crawl(tokens[2].value, CRAWLER_METADUMP); + switch(rv) { + case CRAWLER_OK: + out_string(c, "OK"); + // TODO: Don't reuse conn_watch here. + conn_set_state(c, conn_watch); + event_del(&c->event); + break; + case CRAWLER_RUNNING: + out_string(c, "BUSY currently processing crawler request"); + break; + case CRAWLER_BADCLASS: + out_string(c, "BADCLASS invalid class id"); + break; + case CRAWLER_NOTSTARTED: + out_string(c, "NOTSTARTED no items to crawl"); + break; + } + return; } else if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN + 1].value, "tocrawl") == 0) { uint32_t tocrawl; if (!safe_strtoul(tokens[2].value, &tocrawl)) { @@ -4015,6 +4042,7 @@ static void process_command(conn *c, char *command) { } else { out_string(c, "ERROR failed to stop lru crawler thread"); } + } else if ((strcmp(tokens[COMMAND_TOKEN + 1].value, "metadump") == 0)) { } else { out_string(c, "ERROR"); } @@ -6372,6 +6400,9 @@ int main (int argc, char **argv) { /* Drop privileges no longer needed */ drop_privileges(); + /* Initialize the uriencode lookup table. */ + uriencode_init(); + /* enter the event loop */ if (event_base_loop(main_base, 0) != 0) { retval = EXIT_FAILURE; diff --git a/memcached.h b/memcached.h index eb1fce6..a98d60f 100644 --- a/memcached.h +++ b/memcached.h @@ -419,6 +419,13 @@ typedef struct _stritem { /* then data with terminating \r\n (no terminating null; it's binary!) */ } item; +typedef void (*crawler_eval_func)(item *it, uint32_t hv, int slab_cls); + +// TODO: If we eventually want user loaded modules, we can't use an enum :( +enum crawler_run_type { + CRAWLER_EXPIRED=0, CRAWLER_METADUMP +}; + typedef struct { struct _stritem *next; struct _stritem *prev; @@ -432,6 +439,8 @@ typedef struct { uint8_t slabs_clsid;/* which slab class we're in */ uint8_t nkey; /* key length, w/terminating null and padding */ uint32_t remaining; /* Max keys to crawl per slab per invocation */ + enum crawler_run_type type; /* which module to use during run */ + crawler_eval_func eval; /* The function to run with the locked item */ } crawler; /* Header when an item is actually a chunk of another item. */ @@ -8,6 +8,41 @@ #include "memcached.h" +static char *uriencode_map[256]; +static char uriencode_str[768]; + +void uriencode_init(void) { + int x; + char *str = uriencode_str; + for (x = 0; x < 256; x++) { + if (isalnum(x) || x == '-' || x == '.' || x == '_' || x == '~') { + uriencode_map[x] = NULL; + } else { + snprintf(str, 4, "%%%02X", x); + uriencode_map[x] = str; + str += 3; /* lobbing off the \0 is fine */ + } + } +} + +bool uriencode(const char *src, char *dst, const size_t srclen, const size_t dstlen) { + int x; + size_t d = 0; + for (x = 0; x < srclen; x++) { + if (d + 4 >= dstlen) + return false; + if (uriencode_map[(unsigned char) src[x]] != NULL) { + memcpy(&dst[d], uriencode_map[(unsigned char) src[x]], 3); + d += 3; + } else { + dst[d] = src[x]; + d++; + } + } + dst[d] = '\0'; + return true; +} + /* Avoid warnings on solaris, where isspace() is an index into an array, and gcc uses signed chars */ #define xisspace(c) isspace((unsigned char)c) @@ -1,3 +1,7 @@ +/* fast-enough functions for uriencoding strings. */ +void uriencode_init(void); +bool uriencode(const char *src, char *dst, const size_t srclen, const size_t dstlen); + /* * Wrappers around strtoull/strtoll that are safer and easier to * use. For tests and assumptions, see internal_tests.c. |