summaryrefslogtreecommitdiff
path: root/items.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2016-07-30 12:42:23 -0700
committerdormando <dormando@rydia.net>2016-08-19 17:34:55 -0700
commita8347f6d0913a3cf22d30400b66bfcedf610ebf9 (patch)
tree1ede0116490e8567bfd35ccf9382d23829620a0b /items.c
parentc793bae62ebbf9b0c2b0663fcbd33684262e3062 (diff)
downloadmemcached-a8347f6d0913a3cf22d30400b66bfcedf610ebf9.tar.gz
prototype functionality for LRU metadumper
Functionality is nearly all there. A handful of FIXME's and TODO's to address. From there it needs to be refactored into something proper.
Diffstat (limited to 'items.c')
-rw-r--r--items.c195
1 files changed, 178 insertions, 17 deletions
diff --git a/items.c b/items.c
index 73a0005..c81b193 100644
--- a/items.c
+++ b/items.c
@@ -13,6 +13,7 @@
#include <time.h>
#include <assert.h>
#include <unistd.h>
+#include <poll.h>
/* Forward Declarations */
static void item_link_q(item *it);
@@ -56,9 +57,17 @@ typedef struct {
bool run_complete;
} crawlerstats_t;
+typedef struct {
+ void *c; /* original connection structure. still with source thread attached. */
+ int sfd; /* client fd. */
+ bipbuf_t *buf; /* output buffer */
+ char *cbuf; /* current buffer */
+} crawler_client_t;
+
static item *heads[LARGEST_ID];
static item *tails[LARGEST_ID];
static crawler crawlers[LARGEST_ID];
+static crawler_client_t crawler_client;
static itemstats_t itemstats[LARGEST_ID];
static unsigned int sizes[LARGEST_ID];
static uint64_t sizes_bytes[LARGEST_ID];
@@ -91,7 +100,7 @@ void item_stats_reset(void) {
static int lru_pull_tail(const int orig_id, const int cur_lru,
const uint64_t total_bytes, const bool do_evict);
-static int lru_crawler_start(uint32_t id, uint32_t remaining);
+static int lru_crawler_start(uint32_t id, uint32_t remaining, const enum crawler_run_type type);
/* Get the next CAS id for a new item. */
/* TODO: refactor some atomics for this. */
@@ -1050,7 +1059,7 @@ static void lru_maintainer_crawler_check(void) {
crawlerstats_t *s = &crawlerstats[i];
/* We've not successfully kicked off a crawl yet. */
if (last_crawls[i] == 0) {
- if (lru_crawler_start(i, 0) > 0) {
+ if (lru_crawler_start(i, 0, CRAWLER_EXPIRED) > 0) {
last_crawls[i] = current_time;
}
}
@@ -1336,6 +1345,123 @@ static void item_crawler_evaluate(item *search, uint32_t hv, int i) {
}
}
+static void item_crawler_metadump(item *it, uint32_t hv, int i) {
+ //int slab_id = CLEAR_LRU(i);
+ char keybuf[KEY_MAX_LENGTH * 3 + 1];
+ int is_flushed = item_is_flushed(it);
+ /* Ignore expired content. */
+ if ((it->exptime != 0 && it->exptime < current_time)
+ || is_flushed) {
+ refcount_decr(&it->refcount);
+ return;
+ }
+ // TODO: uriencode directly into the buffer.
+ uriencode(ITEM_key(it), keybuf, it->nkey, KEY_MAX_LENGTH * 3 + 1);
+ int total = snprintf(crawler_client.cbuf, 4096,
+ "key=%s exp=%ld la=%u cas=%llu fetch=%s\n",
+ keybuf,
+ (it->exptime == 0) ? -1 : (long)it->exptime + process_started,
+ it->time,
+ (unsigned long long)ITEM_get_cas(it),
+ (it->it_flags & ITEM_FETCHED) ? "yes" : "no");
+ refcount_decr(&it->refcount);
+ // TODO: some way of tracking the errors. these are very unlikely though.
+ if (total >= 4096 || total <= 0) {
+ /* Failed to write, don't push it. */
+ return;
+ }
+ bipbuf_push(crawler_client.buf, total);
+}
+
+static void item_crawler_close_client(crawler_client_t *c) {
+ fprintf(stderr, "CRAWLER: Closing client\n");
+ sidethread_conn_close(c->c);
+ c->c = NULL;
+ c->cbuf = NULL;
+ bipbuf_free(c->buf);
+ c->buf = NULL;
+}
+
+static int item_crawler_metadump_poll(crawler_client_t *c) {
+ unsigned char *data;
+ unsigned int data_size = 0;
+ struct pollfd to_poll[1];
+ to_poll[0].fd = c->sfd;
+ to_poll[0].events = POLLOUT;
+
+ int ret = poll(to_poll, 1, 1000);
+
+ if (ret < 0) {
+ // fatal.
+ return -1;
+ }
+
+ if (ret == 0) return 0;
+
+ if (to_poll[0].revents & POLLIN) {
+ char buf[1];
+ int res = read(c->sfd, buf, 1);
+ if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
+ item_crawler_close_client(c);
+ return -1;
+ }
+ }
+ if ((data = bipbuf_peek_all(c->buf, &data_size)) != NULL) {
+ if (to_poll[0].revents & (POLLHUP|POLLERR)) {
+ item_crawler_close_client(c);
+ return -1;
+ } else if (to_poll[0].revents & POLLOUT) {
+ int total = write(c->sfd, data, data_size);
+ if (total == -1) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ item_crawler_close_client(c);
+ return -1;
+ }
+ } else if (total == 0) {
+ item_crawler_close_client(c);
+ return -1;
+ } else {
+ bipbuf_poll(c->buf, total);
+ }
+ }
+ }
+ return 0;
+}
+
+/* Grab some space to work with, if none exists, run the poll() loop and wait
+ * for it to clear up or close.
+ * Return NULL if closed.
+ */
+static int item_crawler_metadump_getbuf(crawler_client_t *c) {
+ void *buf = NULL;
+ if (c->c == NULL) return -1;
+ /* not enough space. */
+ while ((buf = bipbuf_request(c->buf, 4096)) == NULL) {
+ // TODO: max loops before closing.
+ int ret = item_crawler_metadump_poll(c);
+ if (ret < 0) return ret;
+ }
+
+ c->cbuf = buf;
+ return 0;
+}
+
+static void item_crawler_class_done(int i) {
+ crawlers[i].it_flags = 0;
+ crawler_count--;
+ crawler_unlink_q((item *)&crawlers[i]);
+ pthread_mutex_unlock(&lru_locks[i]);
+ pthread_mutex_lock(&lru_crawler_stats_lock);
+ crawlerstats[CLEAR_LRU(i)].end_time = current_time;
+ crawlerstats[CLEAR_LRU(i)].run_complete = true;
+ pthread_mutex_unlock(&lru_crawler_stats_lock);
+
+ if (crawlers[i].type == CRAWLER_METADUMP && crawler_client.c != NULL) {
+ item_crawler_metadump_poll(&crawler_client);
+ item_crawler_close_client(&crawler_client);
+ }
+}
+
static void *item_crawler_thread(void *arg) {
int i;
int crawls_persleep = settings.crawls_persleep;
@@ -1356,20 +1482,23 @@ static void *item_crawler_thread(void *arg) {
if (crawlers[i].it_flags != 1) {
continue;
}
+
+ /* Get memory from bipbuf, if client has no space, flush. */
+ // TODO: Will become a callback run here.
+ if (crawlers[i].type == CRAWLER_METADUMP) {
+ int ret = item_crawler_metadump_getbuf(&crawler_client);
+ if (ret != 0) {
+ item_crawler_class_done(i);
+ continue;
+ }
+ }
pthread_mutex_lock(&lru_locks[i]);
search = crawler_crawl_q((item *)&crawlers[i]);
if (search == NULL ||
(crawlers[i].remaining && --crawlers[i].remaining < 1)) {
if (settings.verbose > 2)
fprintf(stderr, "Nothing left to crawl for %d\n", i);
- crawlers[i].it_flags = 0;
- crawler_count--;
- crawler_unlink_q((item *)&crawlers[i]);
- pthread_mutex_unlock(&lru_locks[i]);
- pthread_mutex_lock(&lru_crawler_stats_lock);
- crawlerstats[CLEAR_LRU(i)].end_time = current_time;
- crawlerstats[CLEAR_LRU(i)].run_complete = true;
- pthread_mutex_unlock(&lru_crawler_stats_lock);
+ item_crawler_class_done(i);
continue;
}
uint32_t hv = hash(ITEM_key(search), search->nkey);
@@ -1392,13 +1521,17 @@ static void *item_crawler_thread(void *arg) {
/* Frees the item or decrements the refcount. */
/* Interface for this could improve: do the free/decr here
* instead? */
+ if (crawlers[i].type == CRAWLER_METADUMP)
+ pthread_mutex_unlock(&lru_locks[i]);
+
pthread_mutex_lock(&lru_crawler_stats_lock);
- item_crawler_evaluate(search, hv, i);
+ crawlers[i].eval(search, hv, i);
pthread_mutex_unlock(&lru_crawler_stats_lock);
if (hold_lock)
item_trylock_unlock(hold_lock);
- pthread_mutex_unlock(&lru_locks[i]);
+ if (crawlers[i].type != CRAWLER_METADUMP)
+ pthread_mutex_unlock(&lru_locks[i]);
if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) {
usleep(settings.lru_crawler_sleep);
@@ -1470,7 +1603,7 @@ int start_item_crawler_thread(void) {
/* 'remaining' is passed in so the LRU maintainer thread can scrub the whole
* LRU every time.
*/
-static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
+static int do_lru_crawler_start(uint32_t id, uint32_t remaining, enum crawler_run_type type) {
int i;
uint32_t sid;
uint32_t tocrawl[3];
@@ -1493,6 +1626,16 @@ static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
crawlers[sid].time = 0;
crawlers[sid].remaining = remaining;
crawlers[sid].slabs_clsid = sid;
+ crawlers[sid].type = type;
+ switch (type) {
+ case CRAWLER_METADUMP:
+ crawlers[sid].eval = item_crawler_metadump;
+ break;
+ case CRAWLER_EXPIRED:
+ default:
+ crawlers[sid].eval = item_crawler_evaluate;
+ break;
+ }
crawler_link_q((item *)&crawlers[sid]);
crawler_count++;
starts++;
@@ -1512,12 +1655,12 @@ static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
return starts;
}
-static int lru_crawler_start(uint32_t id, uint32_t remaining) {
+static int lru_crawler_start(uint32_t id, uint32_t remaining, const enum crawler_run_type type) {
int starts;
if (pthread_mutex_trylock(&lru_crawler_lock) != 0) {
return 0;
}
- starts = do_lru_crawler_start(id, remaining);
+ starts = do_lru_crawler_start(id, remaining, type);
if (starts) {
pthread_cond_signal(&lru_crawler_cond);
}
@@ -1525,12 +1668,30 @@ static int lru_crawler_start(uint32_t id, uint32_t remaining) {
return starts;
}
+/* FIXME: Temporary hack since we can't yet pass this information into
+ * lru_crawler_crawl.. which has the proper locks/etc.
+ * Multiple parallel commands could race, but isn't part of the testing.
+ */
+int lru_crawler_set_client(void *c, const int sfd) {
+ if (crawler_client.c != NULL) {
+ return -1;
+ }
+ crawler_client.c = c;
+ crawler_client.sfd = sfd;
+
+ crawler_client.buf = bipbuf_new(1024 * 128);
+ if (crawler_client.buf == NULL) {
+ return -2;
+ }
+ return 0;
+}
+
/* FIXME: Split this into two functions: one to kick a crawler for a sid, and one to
* parse the string. LRU maintainer code is generating a string to set up a
* sid.
* Also only clear the crawlerstats once per sid.
*/
-enum crawler_result_type lru_crawler_crawl(char *slabs) {
+enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_type type) {
char *b = NULL;
uint32_t sid = 0;
int starts = 0;
@@ -1561,7 +1722,7 @@ enum crawler_result_type lru_crawler_crawl(char *slabs) {
for (sid = POWER_SMALLEST; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
if (tocrawl[sid])
- starts += do_lru_crawler_start(sid, settings.lru_crawler_tocrawl);
+ starts += do_lru_crawler_start(sid, settings.lru_crawler_tocrawl, type);
}
if (starts) {
pthread_cond_signal(&lru_crawler_cond);