diff options
author | dormando <dormando@rydia.net> | 2016-07-30 12:42:23 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2016-08-19 17:34:55 -0700 |
commit | a8347f6d0913a3cf22d30400b66bfcedf610ebf9 (patch) | |
tree | 1ede0116490e8567bfd35ccf9382d23829620a0b /items.c | |
parent | c793bae62ebbf9b0c2b0663fcbd33684262e3062 (diff) | |
download | memcached-a8347f6d0913a3cf22d30400b66bfcedf610ebf9.tar.gz |
prototype functionality for LRU metadumper
Functionality is nearly all there. A handful of FIXME's and TODO's to address.
From there it needs to be refactored into something proper.
Diffstat (limited to 'items.c')
-rw-r--r-- | items.c | 195 |
1 files changed, 178 insertions, 17 deletions
@@ -13,6 +13,7 @@ #include <time.h> #include <assert.h> #include <unistd.h> +#include <poll.h> /* Forward Declarations */ static void item_link_q(item *it); @@ -56,9 +57,17 @@ typedef struct { bool run_complete; } crawlerstats_t; +typedef struct { + void *c; /* original connection structure. still with source thread attached. */ + int sfd; /* client fd. */ + bipbuf_t *buf; /* output buffer */ + char *cbuf; /* current buffer */ +} crawler_client_t; + static item *heads[LARGEST_ID]; static item *tails[LARGEST_ID]; static crawler crawlers[LARGEST_ID]; +static crawler_client_t crawler_client; static itemstats_t itemstats[LARGEST_ID]; static unsigned int sizes[LARGEST_ID]; static uint64_t sizes_bytes[LARGEST_ID]; @@ -91,7 +100,7 @@ void item_stats_reset(void) { static int lru_pull_tail(const int orig_id, const int cur_lru, const uint64_t total_bytes, const bool do_evict); -static int lru_crawler_start(uint32_t id, uint32_t remaining); +static int lru_crawler_start(uint32_t id, uint32_t remaining, const enum crawler_run_type type); /* Get the next CAS id for a new item. */ /* TODO: refactor some atomics for this. */ @@ -1050,7 +1059,7 @@ static void lru_maintainer_crawler_check(void) { crawlerstats_t *s = &crawlerstats[i]; /* We've not successfully kicked off a crawl yet. */ if (last_crawls[i] == 0) { - if (lru_crawler_start(i, 0) > 0) { + if (lru_crawler_start(i, 0, CRAWLER_EXPIRED) > 0) { last_crawls[i] = current_time; } } @@ -1336,6 +1345,123 @@ static void item_crawler_evaluate(item *search, uint32_t hv, int i) { } } +static void item_crawler_metadump(item *it, uint32_t hv, int i) { + //int slab_id = CLEAR_LRU(i); + char keybuf[KEY_MAX_LENGTH * 3 + 1]; + int is_flushed = item_is_flushed(it); + /* Ignore expired content. */ + if ((it->exptime != 0 && it->exptime < current_time) + || is_flushed) { + refcount_decr(&it->refcount); + return; + } + // TODO: uriencode directly into the buffer. + uriencode(ITEM_key(it), keybuf, it->nkey, KEY_MAX_LENGTH * 3 + 1); + int total = snprintf(crawler_client.cbuf, 4096, + "key=%s exp=%ld la=%u cas=%llu fetch=%s\n", + keybuf, + (it->exptime == 0) ? -1 : (long)it->exptime + process_started, + it->time, + (unsigned long long)ITEM_get_cas(it), + (it->it_flags & ITEM_FETCHED) ? "yes" : "no"); + refcount_decr(&it->refcount); + // TODO: some way of tracking the errors. these are very unlikely though. + if (total >= 4096 || total <= 0) { + /* Failed to write, don't push it. */ + return; + } + bipbuf_push(crawler_client.buf, total); +} + +static void item_crawler_close_client(crawler_client_t *c) { + fprintf(stderr, "CRAWLER: Closing client\n"); + sidethread_conn_close(c->c); + c->c = NULL; + c->cbuf = NULL; + bipbuf_free(c->buf); + c->buf = NULL; +} + +static int item_crawler_metadump_poll(crawler_client_t *c) { + unsigned char *data; + unsigned int data_size = 0; + struct pollfd to_poll[1]; + to_poll[0].fd = c->sfd; + to_poll[0].events = POLLOUT; + + int ret = poll(to_poll, 1, 1000); + + if (ret < 0) { + // fatal. + return -1; + } + + if (ret == 0) return 0; + + if (to_poll[0].revents & POLLIN) { + char buf[1]; + int res = read(c->sfd, buf, 1); + if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) { + item_crawler_close_client(c); + return -1; + } + } + if ((data = bipbuf_peek_all(c->buf, &data_size)) != NULL) { + if (to_poll[0].revents & (POLLHUP|POLLERR)) { + item_crawler_close_client(c); + return -1; + } else if (to_poll[0].revents & POLLOUT) { + int total = write(c->sfd, data, data_size); + if (total == -1) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + item_crawler_close_client(c); + return -1; + } + } else if (total == 0) { + item_crawler_close_client(c); + return -1; + } else { + bipbuf_poll(c->buf, total); + } + } + } + return 0; +} + +/* Grab some space to work with, if none exists, run the poll() loop and wait + * for it to clear up or close. + * Return NULL if closed. + */ +static int item_crawler_metadump_getbuf(crawler_client_t *c) { + void *buf = NULL; + if (c->c == NULL) return -1; + /* not enough space. */ + while ((buf = bipbuf_request(c->buf, 4096)) == NULL) { + // TODO: max loops before closing. + int ret = item_crawler_metadump_poll(c); + if (ret < 0) return ret; + } + + c->cbuf = buf; + return 0; +} + +static void item_crawler_class_done(int i) { + crawlers[i].it_flags = 0; + crawler_count--; + crawler_unlink_q((item *)&crawlers[i]); + pthread_mutex_unlock(&lru_locks[i]); + pthread_mutex_lock(&lru_crawler_stats_lock); + crawlerstats[CLEAR_LRU(i)].end_time = current_time; + crawlerstats[CLEAR_LRU(i)].run_complete = true; + pthread_mutex_unlock(&lru_crawler_stats_lock); + + if (crawlers[i].type == CRAWLER_METADUMP && crawler_client.c != NULL) { + item_crawler_metadump_poll(&crawler_client); + item_crawler_close_client(&crawler_client); + } +} + static void *item_crawler_thread(void *arg) { int i; int crawls_persleep = settings.crawls_persleep; @@ -1356,20 +1482,23 @@ static void *item_crawler_thread(void *arg) { if (crawlers[i].it_flags != 1) { continue; } + + /* Get memory from bipbuf, if client has no space, flush. */ + // TODO: Will become a callback run here. + if (crawlers[i].type == CRAWLER_METADUMP) { + int ret = item_crawler_metadump_getbuf(&crawler_client); + if (ret != 0) { + item_crawler_class_done(i); + continue; + } + } pthread_mutex_lock(&lru_locks[i]); search = crawler_crawl_q((item *)&crawlers[i]); if (search == NULL || (crawlers[i].remaining && --crawlers[i].remaining < 1)) { if (settings.verbose > 2) fprintf(stderr, "Nothing left to crawl for %d\n", i); - crawlers[i].it_flags = 0; - crawler_count--; - crawler_unlink_q((item *)&crawlers[i]); - pthread_mutex_unlock(&lru_locks[i]); - pthread_mutex_lock(&lru_crawler_stats_lock); - crawlerstats[CLEAR_LRU(i)].end_time = current_time; - crawlerstats[CLEAR_LRU(i)].run_complete = true; - pthread_mutex_unlock(&lru_crawler_stats_lock); + item_crawler_class_done(i); continue; } uint32_t hv = hash(ITEM_key(search), search->nkey); @@ -1392,13 +1521,17 @@ static void *item_crawler_thread(void *arg) { /* Frees the item or decrements the refcount. */ /* Interface for this could improve: do the free/decr here * instead? */ + if (crawlers[i].type == CRAWLER_METADUMP) + pthread_mutex_unlock(&lru_locks[i]); + pthread_mutex_lock(&lru_crawler_stats_lock); - item_crawler_evaluate(search, hv, i); + crawlers[i].eval(search, hv, i); pthread_mutex_unlock(&lru_crawler_stats_lock); if (hold_lock) item_trylock_unlock(hold_lock); - pthread_mutex_unlock(&lru_locks[i]); + if (crawlers[i].type != CRAWLER_METADUMP) + pthread_mutex_unlock(&lru_locks[i]); if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) { usleep(settings.lru_crawler_sleep); @@ -1470,7 +1603,7 @@ int start_item_crawler_thread(void) { /* 'remaining' is passed in so the LRU maintainer thread can scrub the whole * LRU every time. */ -static int do_lru_crawler_start(uint32_t id, uint32_t remaining) { +static int do_lru_crawler_start(uint32_t id, uint32_t remaining, enum crawler_run_type type) { int i; uint32_t sid; uint32_t tocrawl[3]; @@ -1493,6 +1626,16 @@ static int do_lru_crawler_start(uint32_t id, uint32_t remaining) { crawlers[sid].time = 0; crawlers[sid].remaining = remaining; crawlers[sid].slabs_clsid = sid; + crawlers[sid].type = type; + switch (type) { + case CRAWLER_METADUMP: + crawlers[sid].eval = item_crawler_metadump; + break; + case CRAWLER_EXPIRED: + default: + crawlers[sid].eval = item_crawler_evaluate; + break; + } crawler_link_q((item *)&crawlers[sid]); crawler_count++; starts++; @@ -1512,12 +1655,12 @@ static int do_lru_crawler_start(uint32_t id, uint32_t remaining) { return starts; } -static int lru_crawler_start(uint32_t id, uint32_t remaining) { +static int lru_crawler_start(uint32_t id, uint32_t remaining, const enum crawler_run_type type) { int starts; if (pthread_mutex_trylock(&lru_crawler_lock) != 0) { return 0; } - starts = do_lru_crawler_start(id, remaining); + starts = do_lru_crawler_start(id, remaining, type); if (starts) { pthread_cond_signal(&lru_crawler_cond); } @@ -1525,12 +1668,30 @@ static int lru_crawler_start(uint32_t id, uint32_t remaining) { return starts; } +/* FIXME: Temporary hack since we can't yet pass this information into + * lru_crawler_crawl.. which has the proper locks/etc. + * Multiple parallel commands could race, but isn't part of the testing. + */ +int lru_crawler_set_client(void *c, const int sfd) { + if (crawler_client.c != NULL) { + return -1; + } + crawler_client.c = c; + crawler_client.sfd = sfd; + + crawler_client.buf = bipbuf_new(1024 * 128); + if (crawler_client.buf == NULL) { + return -2; + } + return 0; +} + /* FIXME: Split this into two functions: one to kick a crawler for a sid, and one to * parse the string. LRU maintainer code is generating a string to set up a * sid. * Also only clear the crawlerstats once per sid. */ -enum crawler_result_type lru_crawler_crawl(char *slabs) { +enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_type type) { char *b = NULL; uint32_t sid = 0; int starts = 0; @@ -1561,7 +1722,7 @@ enum crawler_result_type lru_crawler_crawl(char *slabs) { for (sid = POWER_SMALLEST; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) { if (tocrawl[sid]) - starts += do_lru_crawler_start(sid, settings.lru_crawler_tocrawl); + starts += do_lru_crawler_start(sid, settings.lru_crawler_tocrawl, type); } if (starts) { pthread_cond_signal(&lru_crawler_cond); |