summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--items.c195
-rw-r--r--items.h3
-rw-r--r--memcached.c33
-rw-r--r--memcached.h9
-rw-r--r--util.c35
-rw-r--r--util.h4
6 files changed, 260 insertions, 19 deletions
diff --git a/items.c b/items.c
index 73a0005..c81b193 100644
--- a/items.c
+++ b/items.c
@@ -13,6 +13,7 @@
#include <time.h>
#include <assert.h>
#include <unistd.h>
+#include <poll.h>
/* Forward Declarations */
static void item_link_q(item *it);
@@ -56,9 +57,17 @@ typedef struct {
bool run_complete;
} crawlerstats_t;
+typedef struct {
+ void *c; /* original connection structure. still with source thread attached. */
+ int sfd; /* client fd. */
+ bipbuf_t *buf; /* output buffer */
+ char *cbuf; /* current buffer */
+} crawler_client_t;
+
static item *heads[LARGEST_ID];
static item *tails[LARGEST_ID];
static crawler crawlers[LARGEST_ID];
+static crawler_client_t crawler_client;
static itemstats_t itemstats[LARGEST_ID];
static unsigned int sizes[LARGEST_ID];
static uint64_t sizes_bytes[LARGEST_ID];
@@ -91,7 +100,7 @@ void item_stats_reset(void) {
static int lru_pull_tail(const int orig_id, const int cur_lru,
const uint64_t total_bytes, const bool do_evict);
-static int lru_crawler_start(uint32_t id, uint32_t remaining);
+static int lru_crawler_start(uint32_t id, uint32_t remaining, const enum crawler_run_type type);
/* Get the next CAS id for a new item. */
/* TODO: refactor some atomics for this. */
@@ -1050,7 +1059,7 @@ static void lru_maintainer_crawler_check(void) {
crawlerstats_t *s = &crawlerstats[i];
/* We've not successfully kicked off a crawl yet. */
if (last_crawls[i] == 0) {
- if (lru_crawler_start(i, 0) > 0) {
+ if (lru_crawler_start(i, 0, CRAWLER_EXPIRED) > 0) {
last_crawls[i] = current_time;
}
}
@@ -1336,6 +1345,123 @@ static void item_crawler_evaluate(item *search, uint32_t hv, int i) {
}
}
+static void item_crawler_metadump(item *it, uint32_t hv, int i) {
+ //int slab_id = CLEAR_LRU(i);
+ char keybuf[KEY_MAX_LENGTH * 3 + 1];
+ int is_flushed = item_is_flushed(it);
+ /* Ignore expired content. */
+ if ((it->exptime != 0 && it->exptime < current_time)
+ || is_flushed) {
+ refcount_decr(&it->refcount);
+ return;
+ }
+ // TODO: uriencode directly into the buffer.
+ uriencode(ITEM_key(it), keybuf, it->nkey, KEY_MAX_LENGTH * 3 + 1);
+ int total = snprintf(crawler_client.cbuf, 4096,
+ "key=%s exp=%ld la=%u cas=%llu fetch=%s\n",
+ keybuf,
+ (it->exptime == 0) ? -1 : (long)it->exptime + process_started,
+ it->time,
+ (unsigned long long)ITEM_get_cas(it),
+ (it->it_flags & ITEM_FETCHED) ? "yes" : "no");
+ refcount_decr(&it->refcount);
+ // TODO: some way of tracking the errors. these are very unlikely though.
+ if (total >= 4096 || total <= 0) {
+ /* Failed to write, don't push it. */
+ return;
+ }
+ bipbuf_push(crawler_client.buf, total);
+}
+
+static void item_crawler_close_client(crawler_client_t *c) {
+ fprintf(stderr, "CRAWLER: Closing client\n");
+ sidethread_conn_close(c->c);
+ c->c = NULL;
+ c->cbuf = NULL;
+ bipbuf_free(c->buf);
+ c->buf = NULL;
+}
+
+static int item_crawler_metadump_poll(crawler_client_t *c) {
+ unsigned char *data;
+ unsigned int data_size = 0;
+ struct pollfd to_poll[1];
+ to_poll[0].fd = c->sfd;
+ to_poll[0].events = POLLOUT;
+
+ int ret = poll(to_poll, 1, 1000);
+
+ if (ret < 0) {
+ // fatal.
+ return -1;
+ }
+
+ if (ret == 0) return 0;
+
+ if (to_poll[0].revents & POLLIN) {
+ char buf[1];
+ int res = read(c->sfd, buf, 1);
+ if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
+ item_crawler_close_client(c);
+ return -1;
+ }
+ }
+ if ((data = bipbuf_peek_all(c->buf, &data_size)) != NULL) {
+ if (to_poll[0].revents & (POLLHUP|POLLERR)) {
+ item_crawler_close_client(c);
+ return -1;
+ } else if (to_poll[0].revents & POLLOUT) {
+ int total = write(c->sfd, data, data_size);
+ if (total == -1) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ item_crawler_close_client(c);
+ return -1;
+ }
+ } else if (total == 0) {
+ item_crawler_close_client(c);
+ return -1;
+ } else {
+ bipbuf_poll(c->buf, total);
+ }
+ }
+ }
+ return 0;
+}
+
+/* Grab some space to work with, if none exists, run the poll() loop and wait
+ * for it to clear up or close.
+ * Return NULL if closed.
+ */
+static int item_crawler_metadump_getbuf(crawler_client_t *c) {
+ void *buf = NULL;
+ if (c->c == NULL) return -1;
+ /* not enough space. */
+ while ((buf = bipbuf_request(c->buf, 4096)) == NULL) {
+ // TODO: max loops before closing.
+ int ret = item_crawler_metadump_poll(c);
+ if (ret < 0) return ret;
+ }
+
+ c->cbuf = buf;
+ return 0;
+}
+
+static void item_crawler_class_done(int i) {
+ crawlers[i].it_flags = 0;
+ crawler_count--;
+ crawler_unlink_q((item *)&crawlers[i]);
+ pthread_mutex_unlock(&lru_locks[i]);
+ pthread_mutex_lock(&lru_crawler_stats_lock);
+ crawlerstats[CLEAR_LRU(i)].end_time = current_time;
+ crawlerstats[CLEAR_LRU(i)].run_complete = true;
+ pthread_mutex_unlock(&lru_crawler_stats_lock);
+
+ if (crawlers[i].type == CRAWLER_METADUMP && crawler_client.c != NULL) {
+ item_crawler_metadump_poll(&crawler_client);
+ item_crawler_close_client(&crawler_client);
+ }
+}
+
static void *item_crawler_thread(void *arg) {
int i;
int crawls_persleep = settings.crawls_persleep;
@@ -1356,20 +1482,23 @@ static void *item_crawler_thread(void *arg) {
if (crawlers[i].it_flags != 1) {
continue;
}
+
+ /* Get memory from bipbuf, if client has no space, flush. */
+ // TODO: Will become a callback run here.
+ if (crawlers[i].type == CRAWLER_METADUMP) {
+ int ret = item_crawler_metadump_getbuf(&crawler_client);
+ if (ret != 0) {
+ item_crawler_class_done(i);
+ continue;
+ }
+ }
pthread_mutex_lock(&lru_locks[i]);
search = crawler_crawl_q((item *)&crawlers[i]);
if (search == NULL ||
(crawlers[i].remaining && --crawlers[i].remaining < 1)) {
if (settings.verbose > 2)
fprintf(stderr, "Nothing left to crawl for %d\n", i);
- crawlers[i].it_flags = 0;
- crawler_count--;
- crawler_unlink_q((item *)&crawlers[i]);
- pthread_mutex_unlock(&lru_locks[i]);
- pthread_mutex_lock(&lru_crawler_stats_lock);
- crawlerstats[CLEAR_LRU(i)].end_time = current_time;
- crawlerstats[CLEAR_LRU(i)].run_complete = true;
- pthread_mutex_unlock(&lru_crawler_stats_lock);
+ item_crawler_class_done(i);
continue;
}
uint32_t hv = hash(ITEM_key(search), search->nkey);
@@ -1392,13 +1521,17 @@ static void *item_crawler_thread(void *arg) {
/* Frees the item or decrements the refcount. */
/* Interface for this could improve: do the free/decr here
* instead? */
+ if (crawlers[i].type == CRAWLER_METADUMP)
+ pthread_mutex_unlock(&lru_locks[i]);
+
pthread_mutex_lock(&lru_crawler_stats_lock);
- item_crawler_evaluate(search, hv, i);
+ crawlers[i].eval(search, hv, i);
pthread_mutex_unlock(&lru_crawler_stats_lock);
if (hold_lock)
item_trylock_unlock(hold_lock);
- pthread_mutex_unlock(&lru_locks[i]);
+ if (crawlers[i].type != CRAWLER_METADUMP)
+ pthread_mutex_unlock(&lru_locks[i]);
if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) {
usleep(settings.lru_crawler_sleep);
@@ -1470,7 +1603,7 @@ int start_item_crawler_thread(void) {
/* 'remaining' is passed in so the LRU maintainer thread can scrub the whole
* LRU every time.
*/
-static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
+static int do_lru_crawler_start(uint32_t id, uint32_t remaining, enum crawler_run_type type) {
int i;
uint32_t sid;
uint32_t tocrawl[3];
@@ -1493,6 +1626,16 @@ static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
crawlers[sid].time = 0;
crawlers[sid].remaining = remaining;
crawlers[sid].slabs_clsid = sid;
+ crawlers[sid].type = type;
+ switch (type) {
+ case CRAWLER_METADUMP:
+ crawlers[sid].eval = item_crawler_metadump;
+ break;
+ case CRAWLER_EXPIRED:
+ default:
+ crawlers[sid].eval = item_crawler_evaluate;
+ break;
+ }
crawler_link_q((item *)&crawlers[sid]);
crawler_count++;
starts++;
@@ -1512,12 +1655,12 @@ static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
return starts;
}
-static int lru_crawler_start(uint32_t id, uint32_t remaining) {
+static int lru_crawler_start(uint32_t id, uint32_t remaining, const enum crawler_run_type type) {
int starts;
if (pthread_mutex_trylock(&lru_crawler_lock) != 0) {
return 0;
}
- starts = do_lru_crawler_start(id, remaining);
+ starts = do_lru_crawler_start(id, remaining, type);
if (starts) {
pthread_cond_signal(&lru_crawler_cond);
}
@@ -1525,12 +1668,30 @@ static int lru_crawler_start(uint32_t id, uint32_t remaining) {
return starts;
}
+/* FIXME: Temporary hack since we can't yet pass this information into
+ * lru_crawler_crawl.. which has the proper locks/etc.
+ * Multiple parallel commands could race, but isn't part of the testing.
+ */
+int lru_crawler_set_client(void *c, const int sfd) {
+ if (crawler_client.c != NULL) {
+ return -1;
+ }
+ crawler_client.c = c;
+ crawler_client.sfd = sfd;
+
+ crawler_client.buf = bipbuf_new(1024 * 128);
+ if (crawler_client.buf == NULL) {
+ return -2;
+ }
+ return 0;
+}
+
/* FIXME: Split this into two functions: one to kick a crawler for a sid, and one to
* parse the string. LRU maintainer code is generating a string to set up a
* sid.
* Also only clear the crawlerstats once per sid.
*/
-enum crawler_result_type lru_crawler_crawl(char *slabs) {
+enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_type type) {
char *b = NULL;
uint32_t sid = 0;
int starts = 0;
@@ -1561,7 +1722,7 @@ enum crawler_result_type lru_crawler_crawl(char *slabs) {
for (sid = POWER_SMALLEST; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
if (tocrawl[sid])
- starts += do_lru_crawler_start(sid, settings.lru_crawler_tocrawl);
+ starts += do_lru_crawler_start(sid, settings.lru_crawler_tocrawl, type);
}
if (starts) {
pthread_cond_signal(&lru_crawler_cond);
diff --git a/items.h b/items.h
index 32aefb8..848f8c0 100644
--- a/items.h
+++ b/items.h
@@ -47,6 +47,7 @@ void lru_maintainer_resume(void);
int start_item_crawler_thread(void);
int stop_item_crawler_thread(void);
int init_lru_crawler(void);
-enum crawler_result_type lru_crawler_crawl(char *slabs);
+enum crawler_result_type lru_crawler_crawl(char *slabs, enum crawler_run_type);
+int lru_crawler_set_client(void *c, const int sfd); /* FIXME: Temporary. */
void lru_crawler_pause(void);
void lru_crawler_resume(void);
diff --git a/memcached.c b/memcached.c
index 406180e..2cf849c 100644
--- a/memcached.c
+++ b/memcached.c
@@ -3964,7 +3964,7 @@ static void process_command(conn *c, char *command) {
return;
}
- rv = lru_crawler_crawl(tokens[2].value);
+ rv = lru_crawler_crawl(tokens[2].value, CRAWLER_EXPIRED);
switch(rv) {
case CRAWLER_OK:
out_string(c, "OK");
@@ -3980,6 +3980,33 @@ static void process_command(conn *c, char *command) {
break;
}
return;
+ } else if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN + 1].value, "metadump") == 0) {
+ if (settings.lru_crawler == false) {
+ out_string(c, "CLIENT_ERROR lru crawler disabled");
+ return;
+ }
+
+ // FIXME: check response code.
+ lru_crawler_set_client(c, c->sfd);
+ int rv = lru_crawler_crawl(tokens[2].value, CRAWLER_METADUMP);
+ switch(rv) {
+ case CRAWLER_OK:
+ out_string(c, "OK");
+ // TODO: Don't reuse conn_watch here.
+ conn_set_state(c, conn_watch);
+ event_del(&c->event);
+ break;
+ case CRAWLER_RUNNING:
+ out_string(c, "BUSY currently processing crawler request");
+ break;
+ case CRAWLER_BADCLASS:
+ out_string(c, "BADCLASS invalid class id");
+ break;
+ case CRAWLER_NOTSTARTED:
+ out_string(c, "NOTSTARTED no items to crawl");
+ break;
+ }
+ return;
} else if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN + 1].value, "tocrawl") == 0) {
uint32_t tocrawl;
if (!safe_strtoul(tokens[2].value, &tocrawl)) {
@@ -4015,6 +4042,7 @@ static void process_command(conn *c, char *command) {
} else {
out_string(c, "ERROR failed to stop lru crawler thread");
}
+ } else if ((strcmp(tokens[COMMAND_TOKEN + 1].value, "metadump") == 0)) {
} else {
out_string(c, "ERROR");
}
@@ -6372,6 +6400,9 @@ int main (int argc, char **argv) {
/* Drop privileges no longer needed */
drop_privileges();
+ /* Initialize the uriencode lookup table. */
+ uriencode_init();
+
/* enter the event loop */
if (event_base_loop(main_base, 0) != 0) {
retval = EXIT_FAILURE;
diff --git a/memcached.h b/memcached.h
index eb1fce6..a98d60f 100644
--- a/memcached.h
+++ b/memcached.h
@@ -419,6 +419,13 @@ typedef struct _stritem {
/* then data with terminating \r\n (no terminating null; it's binary!) */
} item;
+typedef void (*crawler_eval_func)(item *it, uint32_t hv, int slab_cls);
+
+// TODO: If we eventually want user loaded modules, we can't use an enum :(
+enum crawler_run_type {
+ CRAWLER_EXPIRED=0, CRAWLER_METADUMP
+};
+
typedef struct {
struct _stritem *next;
struct _stritem *prev;
@@ -432,6 +439,8 @@ typedef struct {
uint8_t slabs_clsid;/* which slab class we're in */
uint8_t nkey; /* key length, w/terminating null and padding */
uint32_t remaining; /* Max keys to crawl per slab per invocation */
+ enum crawler_run_type type; /* which module to use during run */
+ crawler_eval_func eval; /* The function to run with the locked item */
} crawler;
/* Header when an item is actually a chunk of another item. */
diff --git a/util.c b/util.c
index cbb0352..e9ec591 100644
--- a/util.c
+++ b/util.c
@@ -8,6 +8,41 @@
#include "memcached.h"
+static char *uriencode_map[256];
+static char uriencode_str[768];
+
+void uriencode_init(void) {
+ int x;
+ char *str = uriencode_str;
+ for (x = 0; x < 256; x++) {
+ if (isalnum(x) || x == '-' || x == '.' || x == '_' || x == '~') {
+ uriencode_map[x] = NULL;
+ } else {
+ snprintf(str, 4, "%%%02X", x);
+ uriencode_map[x] = str;
+ str += 3; /* lobbing off the \0 is fine */
+ }
+ }
+}
+
+bool uriencode(const char *src, char *dst, const size_t srclen, const size_t dstlen) {
+ int x;
+ size_t d = 0;
+ for (x = 0; x < srclen; x++) {
+ if (d + 4 >= dstlen)
+ return false;
+ if (uriencode_map[(unsigned char) src[x]] != NULL) {
+ memcpy(&dst[d], uriencode_map[(unsigned char) src[x]], 3);
+ d += 3;
+ } else {
+ dst[d] = src[x];
+ d++;
+ }
+ }
+ dst[d] = '\0';
+ return true;
+}
+
/* Avoid warnings on solaris, where isspace() is an index into an array, and gcc uses signed chars */
#define xisspace(c) isspace((unsigned char)c)
diff --git a/util.h b/util.h
index 1ec1a52..33ca04a 100644
--- a/util.h
+++ b/util.h
@@ -1,3 +1,7 @@
+/* fast-enough functions for uriencoding strings. */
+void uriencode_init(void);
+bool uriencode(const char *src, char *dst, const size_t srclen, const size_t dstlen);
+
/*
* Wrappers around strtoull/strtoll that are safer and easier to
* use. For tests and assumptions, see internal_tests.c.