summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2016-08-04 17:55:06 -0700
committerdormando <dormando@rydia.net>2016-08-19 17:34:55 -0700
commit502e94080d30a77e05d6373b25322e7160a91f39 (patch)
tree95c2cb78971064c95aecd5787885bf12703bf752
parent31e7d8f5feb4a4a4c52557b2a2597bd074de47ca (diff)
downloadmemcached-502e94080d30a77e05d6373b25322e7160a91f39.tar.gz
refactor checkpoint for LRU crawler
now has internal module system for the LRU crawler. autocrawl checker should be a bit better now. doesn't constantly re-run the histogram calcs. metadump works as a module now. ended up generalizing the client case outside of the module system since it looks reusable. Cut the amount of functions required for metadump specifically to nothing. still need to bug hunt, a few more smaller refactors, and see about pulling this out into its own file.
-rw-r--r--items.c428
-rw-r--r--items.h5
-rw-r--r--memcached.c13
-rw-r--r--memcached.h4
4 files changed, 296 insertions, 154 deletions
diff --git a/items.c b/items.c
index d7de9cd..e245eaa 100644
--- a/items.c
+++ b/items.c
@@ -47,6 +47,36 @@ typedef struct {
} itemstats_t;
typedef struct {
+ void *c; /* original connection structure. still with source thread attached. */
+ int sfd; /* client fd. */
+ bipbuf_t *buf; /* output buffer */
+ char *cbuf; /* current buffer */
+} crawler_client_t;
+
+typedef struct _crawler_module_t crawler_module_t;
+
+typedef void (*crawler_eval_func)(crawler_module_t *cm, item *it, uint32_t hv, int slab_cls);
+typedef int (*crawler_init_func)(crawler_module_t *cm, void *data); // TODO: init args?
+typedef void (*crawler_deinit_func)(crawler_module_t *cm); // TODO: extra args?
+typedef void (*crawler_doneclass_func)(crawler_module_t *cm, int slab_cls);
+typedef void (*crawler_finalize_func)(crawler_module_t *cm);
+
+typedef struct {
+ crawler_init_func init; /* run before crawl starts */
+ crawler_eval_func eval; /* runs on an item. */
+ crawler_doneclass_func doneclass; /* runs once per sub-crawler completion. */
+ crawler_finalize_func finalize; /* runs once when all sub-crawlers are done. */
+ bool needs_lock; /* whether or not we need the LRU lock held when eval is called */
+ bool needs_client; /* whether or not to grab onto the remote client */
+} crawler_module_reg_t;
+
+struct _crawler_module_t {
+ void *data; /* opaque data pointer */
+ crawler_client_t c;
+ crawler_module_reg_t *mod;
+};
+
+typedef struct {
uint64_t histo[61];
uint64_t ttl_hourplus;
uint64_t noexp;
@@ -57,24 +87,57 @@ typedef struct {
bool run_complete;
} crawlerstats_t;
-typedef struct {
- void *c; /* original connection structure. still with source thread attached. */
- int sfd; /* client fd. */
- bipbuf_t *buf; /* output buffer */
- char *cbuf; /* current buffer */
-} crawler_client_t;
+struct crawler_expired_data {
+ pthread_mutex_t lock;
+ crawlerstats_t crawlerstats[MAX_NUMBER_OF_SLAB_CLASSES];
+ /* redundant with crawlerstats_t so we can get overall start/stop/done */
+ rel_time_t start_time;
+ rel_time_t end_time;
+ bool crawl_complete;
+ bool is_external; /* whether this was an alloc local or remote to the module. */
+};
+
+static int crawler_expired_init(crawler_module_t *cm, void *data);
+static void crawler_expired_doneclass(crawler_module_t *cm, int slab_cls);
+static void crawler_expired_finalize(crawler_module_t *cm);
+static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv, int i);
+
+crawler_module_reg_t crawler_expired_mod = {
+ .init = crawler_expired_init,
+ .eval = crawler_expired_eval,
+ .doneclass = crawler_expired_doneclass,
+ .finalize = crawler_expired_finalize,
+ .needs_lock = true,
+ .needs_client = false
+};
+
+static void crawler_metadump_eval(crawler_module_t *cm, item *search, uint32_t hv, int i);
+
+crawler_module_reg_t crawler_metadump_mod = {
+ .init = NULL,
+ .eval = crawler_metadump_eval,
+ .doneclass = NULL,
+ .finalize = NULL,
+ .needs_lock = false,
+ .needs_client = true
+};
+
+crawler_module_reg_t *crawler_mod_regs[2] = {
+ &crawler_expired_mod,
+ &crawler_metadump_mod
+};
+
+crawler_module_t active_crawler_mod;
static item *heads[LARGEST_ID];
static item *tails[LARGEST_ID];
static crawler crawlers[LARGEST_ID];
-static crawler_client_t crawler_client;
static itemstats_t itemstats[LARGEST_ID];
static unsigned int sizes[LARGEST_ID];
static uint64_t sizes_bytes[LARGEST_ID];
static unsigned int *stats_sizes_hist = NULL;
static uint64_t stats_sizes_cas_min = 0;
static int stats_sizes_buckets = 0;
-static crawlerstats_t crawlerstats[MAX_NUMBER_OF_SLAB_CLASSES];
static int crawler_count = 0;
static volatile int do_run_lru_crawler_thread = 0;
@@ -84,7 +147,6 @@ static int lru_maintainer_initialized = 0;
static int lru_maintainer_check_clsid = 0;
static pthread_mutex_t lru_crawler_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t lru_crawler_cond = PTHREAD_COND_INITIALIZER;
-static pthread_mutex_t lru_crawler_stats_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t lru_maintainer_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t cas_id_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t stats_sizes_lock = PTHREAD_MUTEX_INITIALIZER;
@@ -103,7 +165,8 @@ void item_stats_reset(void) {
static int lru_pull_tail(const int orig_id, const int cur_lru,
const uint64_t total_bytes, uint8_t flags);
-static int lru_crawler_start(uint32_t id, uint32_t remaining, const enum crawler_run_type type);
+static int lru_crawler_start(uint8_t *ids, uint32_t remaining, const enum crawler_run_type type,
+ void *data, void *c, const int sfd);
/* Get the next CAS id for a new item. */
/* TODO: refactor some atomics for this. */
@@ -1058,20 +1121,22 @@ static int lru_maintainer_juggle(const int slabs_clsid) {
* The latter is to avoid newly started daemons from waiting too long before
* retrying a crawl.
*/
-static void lru_maintainer_crawler_check(void) {
+static void lru_maintainer_crawler_check(struct crawler_expired_data *cdata) {
int i;
- static rel_time_t last_crawls[MAX_NUMBER_OF_SLAB_CLASSES];
+ static rel_time_t next_crawls[MAX_NUMBER_OF_SLAB_CLASSES];
static rel_time_t next_crawl_wait[MAX_NUMBER_OF_SLAB_CLASSES];
+ uint8_t todo[MAX_NUMBER_OF_SLAB_CLASSES];
+ memset(todo, 0, sizeof(uint8_t) * MAX_NUMBER_OF_SLAB_CLASSES);
+ bool do_run = false;
+ if (!cdata->crawl_complete) {
+ return;
+ }
+
for (i = POWER_SMALLEST; i < MAX_NUMBER_OF_SLAB_CLASSES; i++) {
- crawlerstats_t *s = &crawlerstats[i];
+ crawlerstats_t *s = &cdata->crawlerstats[i];
/* We've not successfully kicked off a crawl yet. */
- if (last_crawls[i] == 0) {
- if (lru_crawler_start(i, 0, CRAWLER_EXPIRED) > 0) {
- last_crawls[i] = current_time;
- }
- }
- pthread_mutex_lock(&lru_crawler_stats_lock);
if (s->run_complete) {
+ pthread_mutex_lock(&cdata->lock);
int x;
/* Should we crawl again? */
uint64_t possible_reclaims = s->seen - s->noexp;
@@ -1083,27 +1148,45 @@ static void lru_maintainer_crawler_check(void) {
rel_time_t since_run = current_time - s->end_time;
/* Don't bother if the payoff is too low. */
if (settings.verbose > 1)
- fprintf(stderr, "maint crawler: low_watermark: %llu, possible_reclaims: %llu, since_run: %u\n",
- (unsigned long long)low_watermark, (unsigned long long)possible_reclaims,
+ fprintf(stderr, "maint crawler[%d]: low_watermark: %llu, possible_reclaims: %llu, since_run: %u\n",
+ i, (unsigned long long)low_watermark, (unsigned long long)possible_reclaims,
(unsigned int)since_run);
for (x = 0; x < 60; x++) {
- if (since_run < (x * 60) + 60)
- break;
available_reclaims += s->histo[x];
+ if (available_reclaims > low_watermark) {
+ if (next_crawl_wait[i] < (x * 60)) {
+ next_crawl_wait[i] += 60;
+ } else if (next_crawl_wait[i] >= 60) {
+ next_crawl_wait[i] -= 60;
+ }
+ break;
+ }
}
- if (available_reclaims > low_watermark) {
- last_crawls[i] = 0;
- if (next_crawl_wait[i] > 60)
- next_crawl_wait[i] -= 60;
- } else if (since_run > 5 && since_run > next_crawl_wait[i]) {
- last_crawls[i] = 0;
- if (next_crawl_wait[i] < MAX_MAINTCRAWL_WAIT)
- next_crawl_wait[i] += 60;
+
+ if (available_reclaims == 0) {
+ next_crawl_wait[i] += 60;
+ }
+
+ if (next_crawl_wait[i] > MAX_MAINTCRAWL_WAIT) {
+ next_crawl_wait[i] = MAX_MAINTCRAWL_WAIT;
}
+
+ next_crawls[i] = current_time + next_crawl_wait[i] + 5;
if (settings.verbose > 1)
- fprintf(stderr, "maint crawler: available reclaims: %llu, next_crawl: %u\n", (unsigned long long)available_reclaims, next_crawl_wait[i]);
+ fprintf(stderr, "maint crawler[%d]: next_crawl: %u, [%d] now: [%d]\n",
+ i, next_crawl_wait[i], next_crawls[i], current_time);
+ // Got our calculation, avoid running until next actual run.
+ s->run_complete = false;
+ pthread_mutex_unlock(&cdata->lock);
}
- pthread_mutex_unlock(&lru_crawler_stats_lock);
+ if (current_time > next_crawls[i]) {
+ todo[i] = 1;
+ do_run = true;
+ next_crawls[i] = current_time + 5; // minimum retry wait.
+ }
+ }
+ if (do_run) {
+ lru_crawler_start(todo, 0, CRAWLER_EXPIRED, cdata, NULL, 0);
}
}
@@ -1116,6 +1199,10 @@ static void *lru_maintainer_thread(void *arg) {
int i;
useconds_t to_sleep = MIN_LRU_MAINTAINER_SLEEP;
rel_time_t last_crawler_check = 0;
+ struct crawler_expired_data cdata;
+ memset(&cdata, 0, sizeof(struct crawler_expired_data));
+ pthread_mutex_init(&cdata.lock, NULL);
+ cdata.crawl_complete = true; // kick off the crawler.
pthread_mutex_lock(&lru_maintainer_lock);
if (settings.verbose > 2)
@@ -1149,7 +1236,7 @@ static void *lru_maintainer_thread(void *arg) {
}
/* Once per second at most */
if (settings.lru_crawler && last_crawler_check != current_time) {
- lru_maintainer_crawler_check();
+ lru_maintainer_crawler_check(&cdata);
last_crawler_check = current_time;
}
}
@@ -1308,13 +1395,83 @@ static item *crawler_crawl_q(item *it) {
return it->next; /* success */
}
+#define LRU_CRAWLER_WRITEBUF 8192
+
+static void lru_crawler_close_client(crawler_client_t *c) {
+ //fprintf(stderr, "CRAWLER: Closing client\n");
+ sidethread_conn_close(c->c);
+ c->c = NULL;
+ c->cbuf = NULL;
+ bipbuf_free(c->buf);
+ c->buf = NULL;
+}
+
+static void lru_crawler_release_client(crawler_client_t *c) {
+ //fprintf(stderr, "CRAWLER: Closing client\n");
+ redispatch_conn(c->c);
+ c->c = NULL;
+ c->cbuf = NULL;
+ bipbuf_free(c->buf);
+ c->buf = NULL;
+}
+
+static int crawler_expired_init(crawler_module_t *cm, void *data) {
+ struct crawler_expired_data *d;
+ if (data != NULL) {
+ d = data;
+ d->is_external = true;
+ cm->data = data;
+ } else {
+ // allocate data.
+ d = calloc(1, sizeof(struct crawler_expired_data));
+ if (d == NULL) {
+ return -1;
+ }
+ // init lock.
+ pthread_mutex_init(&d->lock, NULL);
+ d->is_external = false;
+ d->start_time = current_time;
+
+ cm->data = d;
+ }
+ pthread_mutex_lock(&d->lock);
+ memset(&d->crawlerstats, 0, sizeof(crawlerstats_t) * MAX_NUMBER_OF_SLAB_CLASSES);
+ for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) {
+ d->crawlerstats[x].start_time = current_time;
+ d->crawlerstats[x].run_complete = false;
+ }
+ pthread_mutex_unlock(&d->lock);
+ return 0;
+}
+
+static void crawler_expired_doneclass(crawler_module_t *cm, int slab_cls) {
+ struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
+ pthread_mutex_lock(&d->lock);
+ d->crawlerstats[CLEAR_LRU(slab_cls)].end_time = current_time;
+ d->crawlerstats[CLEAR_LRU(slab_cls)].run_complete = true;
+ pthread_mutex_unlock(&d->lock);
+}
+
+static void crawler_expired_finalize(crawler_module_t *cm) {
+ struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
+ pthread_mutex_lock(&d->lock);
+ d->end_time = current_time;
+ d->crawl_complete = true;
+ pthread_mutex_unlock(&d->lock);
+
+ if (!d->is_external) {
+ free(d);
+ }
+}
+
/* I pulled this out to make the main thread clearer, but it reaches into the
* main thread's values too much. Should rethink again.
*/
-static void item_crawler_evaluate(item *search, uint32_t hv, int i) {
+static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv, int i) {
int slab_id = CLEAR_LRU(i);
- crawlerstats_t *s = &crawlerstats[slab_id];
- itemstats[i].crawler_items_checked++;
+ struct crawler_expired_data *d = (struct crawler_expired_data *) cm->data;
+ pthread_mutex_lock(&d->lock);
+ crawlerstats_t *s = &d->crawlerstats[slab_id];
int is_flushed = item_is_flushed(search);
if ((search->exptime != 0 && search->exptime < current_time)
|| is_flushed) {
@@ -1350,9 +1507,10 @@ static void item_crawler_evaluate(item *search, uint32_t hv, int i) {
s->histo[bucket]++;
}
}
+ pthread_mutex_unlock(&d->lock);
}
-static void item_crawler_metadump(item *it, uint32_t hv, int i) {
+static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, int i) {
//int slab_id = CLEAR_LRU(i);
char keybuf[KEY_MAX_LENGTH * 3 + 1];
int is_flushed = item_is_flushed(it);
@@ -1364,7 +1522,7 @@ static void item_crawler_metadump(item *it, uint32_t hv, int i) {
}
// TODO: uriencode directly into the buffer.
uriencode(ITEM_key(it), keybuf, it->nkey, KEY_MAX_LENGTH * 3 + 1);
- int total = snprintf(crawler_client.cbuf, 4096,
+ int total = snprintf(cm->c.cbuf, 4096,
"key=%s exp=%ld la=%llu cas=%llu fetch=%s\n",
keybuf,
(it->exptime == 0) ? -1 : (long)it->exptime + process_started,
@@ -1373,32 +1531,14 @@ static void item_crawler_metadump(item *it, uint32_t hv, int i) {
(it->it_flags & ITEM_FETCHED) ? "yes" : "no");
refcount_decr(&it->refcount);
// TODO: some way of tracking the errors. these are very unlikely though.
- if (total >= 4096 || total <= 0) {
+ if (total >= LRU_CRAWLER_WRITEBUF - 1 || total <= 0) {
/* Failed to write, don't push it. */
return;
}
- bipbuf_push(crawler_client.buf, total);
-}
-
-static void item_crawler_close_client(crawler_client_t *c) {
- //fprintf(stderr, "CRAWLER: Closing client\n");
- sidethread_conn_close(c->c);
- c->c = NULL;
- c->cbuf = NULL;
- bipbuf_free(c->buf);
- c->buf = NULL;
+ bipbuf_push(cm->c.buf, total);
}
-static void item_crawler_release_client(crawler_client_t *c) {
- //fprintf(stderr, "CRAWLER: Closing client\n");
- redispatch_conn(c->c);
- c->c = NULL;
- c->cbuf = NULL;
- bipbuf_free(c->buf);
- c->buf = NULL;
-}
-
-static int item_crawler_metadump_poll(crawler_client_t *c) {
+static int lru_crawler_poll(crawler_client_t *c) {
unsigned char *data;
unsigned int data_size = 0;
struct pollfd to_poll[1];
@@ -1418,23 +1558,23 @@ static int item_crawler_metadump_poll(crawler_client_t *c) {
char buf[1];
int res = read(c->sfd, buf, 1);
if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
- item_crawler_close_client(c);
+ lru_crawler_close_client(c);
return -1;
}
}
if ((data = bipbuf_peek_all(c->buf, &data_size)) != NULL) {
if (to_poll[0].revents & (POLLHUP|POLLERR)) {
- item_crawler_close_client(c);
+ lru_crawler_close_client(c);
return -1;
} else if (to_poll[0].revents & POLLOUT) {
int total = write(c->sfd, data, data_size);
if (total == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
- item_crawler_close_client(c);
+ lru_crawler_close_client(c);
return -1;
}
} else if (total == 0) {
- item_crawler_close_client(c);
+ lru_crawler_close_client(c);
return -1;
} else {
bipbuf_poll(c->buf, total);
@@ -1448,13 +1588,13 @@ static int item_crawler_metadump_poll(crawler_client_t *c) {
* for it to clear up or close.
* Return NULL if closed.
*/
-static int item_crawler_metadump_getbuf(crawler_client_t *c) {
+static int lru_crawler_client_getbuf(crawler_client_t *c) {
void *buf = NULL;
if (c->c == NULL) return -1;
/* not enough space. */
- while ((buf = bipbuf_request(c->buf, 4096)) == NULL) {
+ while ((buf = bipbuf_request(c->buf, LRU_CRAWLER_WRITEBUF)) == NULL) {
// TODO: max loops before closing.
- int ret = item_crawler_metadump_poll(c);
+ int ret = lru_crawler_poll(c);
if (ret < 0) return ret;
}
@@ -1462,19 +1602,13 @@ static int item_crawler_metadump_getbuf(crawler_client_t *c) {
return 0;
}
-static void item_crawler_class_done(int i) {
+static void lru_crawler_class_done(int i) {
crawlers[i].it_flags = 0;
crawler_count--;
crawler_unlink_q((item *)&crawlers[i]);
pthread_mutex_unlock(&lru_locks[i]);
- pthread_mutex_lock(&lru_crawler_stats_lock);
- crawlerstats[CLEAR_LRU(i)].end_time = current_time;
- crawlerstats[CLEAR_LRU(i)].run_complete = true;
- pthread_mutex_unlock(&lru_crawler_stats_lock);
-
- if (crawlers[i].type == CRAWLER_METADUMP && crawler_client.c != NULL) {
- item_crawler_metadump_poll(&crawler_client);
- }
+ if (active_crawler_mod.mod->doneclass != NULL)
+ active_crawler_mod.mod->doneclass(&active_crawler_mod, i);
}
static void *item_crawler_thread(void *arg) {
@@ -1499,11 +1633,11 @@ static void *item_crawler_thread(void *arg) {
}
/* Get memory from bipbuf, if client has no space, flush. */
- // TODO: Will become a callback run here.
- if (crawlers[i].type == CRAWLER_METADUMP) {
- int ret = item_crawler_metadump_getbuf(&crawler_client);
+ if (active_crawler_mod.c.c != NULL) {
+ int ret = lru_crawler_client_getbuf(&active_crawler_mod.c);
if (ret != 0) {
- item_crawler_class_done(i);
+ // FIXME: Kill off the whole run.
+ lru_crawler_class_done(i);
continue;
}
}
@@ -1513,7 +1647,7 @@ static void *item_crawler_thread(void *arg) {
(crawlers[i].remaining && --crawlers[i].remaining < 1)) {
if (settings.verbose > 2)
fprintf(stderr, "Nothing left to crawl for %d\n", i);
- item_crawler_class_done(i);
+ lru_crawler_class_done(i);
continue;
}
uint32_t hv = hash(ITEM_key(search), search->nkey);
@@ -1533,20 +1667,21 @@ static void *item_crawler_thread(void *arg) {
continue;
}
+ itemstats[i].crawler_items_checked++;
/* Frees the item or decrements the refcount. */
/* Interface for this could improve: do the free/decr here
* instead? */
- if (crawlers[i].type == CRAWLER_METADUMP)
+ if (!active_crawler_mod.mod->needs_lock) {
pthread_mutex_unlock(&lru_locks[i]);
+ }
- pthread_mutex_lock(&lru_crawler_stats_lock);
- crawlers[i].eval(search, hv, i);
- pthread_mutex_unlock(&lru_crawler_stats_lock);
+ active_crawler_mod.mod->eval(&active_crawler_mod, search, hv, i);
if (hold_lock)
item_trylock_unlock(hold_lock);
- if (crawlers[i].type != CRAWLER_METADUMP)
+ if (active_crawler_mod.mod->needs_lock) {
pthread_mutex_unlock(&lru_locks[i]);
+ }
if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) {
usleep(settings.lru_crawler_sleep);
@@ -1555,11 +1690,14 @@ static void *item_crawler_thread(void *arg) {
}
}
- // FIXME: Need to global-ize the type of the crawl, or we're calling
- // finalize on an opaque here.
- if (crawler_client.c != NULL) {
- item_crawler_metadump_poll(&crawler_client);
- item_crawler_release_client(&crawler_client);
+ if (active_crawler_mod.mod != NULL) {
+ if (active_crawler_mod.mod->finalize != NULL)
+ active_crawler_mod.mod->finalize(&active_crawler_mod);
+ if (active_crawler_mod.c.c != NULL) {
+ lru_crawler_poll(&active_crawler_mod.c);
+ lru_crawler_release_client(&active_crawler_mod.c);
+ }
+ active_crawler_mod.mod = NULL;
}
if (settings.verbose > 2)
@@ -1626,7 +1764,7 @@ int start_item_crawler_thread(void) {
/* 'remaining' is passed in so the LRU maintainer thread can scrub the whole
* LRU every time.
*/
-static int do_lru_crawler_start(uint32_t id, uint32_t remaining, enum crawler_run_type type) {
+static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
int i;
uint32_t sid;
uint32_t tocrawl[3];
@@ -1649,16 +1787,6 @@ static int do_lru_crawler_start(uint32_t id, uint32_t remaining, enum crawler_ru
crawlers[sid].time = 0;
crawlers[sid].remaining = remaining;
crawlers[sid].slabs_clsid = sid;
- crawlers[sid].type = type;
- switch (type) {
- case CRAWLER_METADUMP:
- crawlers[sid].eval = item_crawler_metadump;
- break;
- case CRAWLER_EXPIRED:
- default:
- crawlers[sid].eval = item_crawler_evaluate;
- break;
- }
crawler_link_q((item *)&crawlers[sid]);
crawler_count++;
starts++;
@@ -1670,58 +1798,71 @@ static int do_lru_crawler_start(uint32_t id, uint32_t remaining, enum crawler_ru
stats_state.lru_crawler_running = true;
stats.lru_crawler_starts++;
STATS_UNLOCK();
- pthread_mutex_lock(&lru_crawler_stats_lock);
- memset(&crawlerstats[id], 0, sizeof(crawlerstats_t));
- crawlerstats[id].start_time = current_time;
- pthread_mutex_unlock(&lru_crawler_stats_lock);
}
return starts;
}
-static int lru_crawler_start(uint32_t id, uint32_t remaining, const enum crawler_run_type type) {
- int starts;
- if (pthread_mutex_trylock(&lru_crawler_lock) != 0) {
- return 0;
+static int lru_crawler_set_client(crawler_module_t *cm, void *c, const int sfd) {
+ crawler_client_t *crawlc = &cm->c;
+ if (crawlc->c != NULL) {
+ return -1;
}
- starts = do_lru_crawler_start(id, remaining, type);
- if (starts) {
- pthread_cond_signal(&lru_crawler_cond);
+ crawlc->c = c;
+ crawlc->sfd = sfd;
+
+ crawlc->buf = bipbuf_new(1024 * 128);
+ if (crawlc->buf == NULL) {
+ return -2;
}
- pthread_mutex_unlock(&lru_crawler_lock);
- return starts;
+ return 0;
}
-/* FIXME: Temporary hack since we can't yet pass this information into
- * lru_crawler_crawl.. which has the proper locks/etc.
- * Multiple parallel commands could race, but isn't part of the testing.
- */
-int lru_crawler_set_client(void *c, const int sfd) {
- if (crawler_client.c != NULL) {
+static int lru_crawler_start(uint8_t *ids, uint32_t remaining,
+ const enum crawler_run_type type, void *data,
+ void *c, const int sfd) {
+ int starts = 0;
+ if (pthread_mutex_trylock(&lru_crawler_lock) != 0) {
return -1;
}
- crawler_client.c = c;
- crawler_client.sfd = sfd;
- crawler_client.buf = bipbuf_new(1024 * 128);
- if (crawler_client.buf == NULL) {
- return -2;
+ /* Configure the module */
+ assert(crawler_mod_regs[type] != NULL);
+ active_crawler_mod.mod = crawler_mod_regs[type];
+ if (active_crawler_mod.mod->init != NULL) {
+ active_crawler_mod.mod->init(&active_crawler_mod, data);
}
- return 0;
+ if (active_crawler_mod.mod->needs_client) {
+ if (c == NULL || sfd == 0) {
+ pthread_mutex_unlock(&lru_crawler_lock);
+ return -2;
+ }
+ if (lru_crawler_set_client(&active_crawler_mod, c, sfd) != 0) {
+ pthread_mutex_unlock(&lru_crawler_lock);
+ return -2;
+ }
+ }
+
+ for (int sid = POWER_SMALLEST; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
+ if (ids[sid]) {
+ starts += do_lru_crawler_start(sid, remaining);
+ }
+ }
+ if (starts) {
+ pthread_cond_signal(&lru_crawler_cond);
+ }
+ pthread_mutex_unlock(&lru_crawler_lock);
+ return starts;
}
-/* FIXME: Split this into two functions: one to kick a crawler for a sid, and one to
- * parse the string. LRU maintainer code is generating a string to set up a
- * sid.
+/*
* Also only clear the crawlerstats once per sid.
*/
-enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_type type) {
+enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_type type,
+ void *c, const int sfd) {
char *b = NULL;
uint32_t sid = 0;
int starts = 0;
uint8_t tocrawl[MAX_NUMBER_OF_SLAB_CLASSES];
- if (pthread_mutex_trylock(&lru_crawler_lock) != 0) {
- return CRAWLER_RUNNING;
- }
/* FIXME: I added this while debugging. Don't think it's needed? */
memset(tocrawl, 0, sizeof(uint8_t) * MAX_NUMBER_OF_SLAB_CLASSES);
@@ -1743,16 +1884,15 @@ enum crawler_result_type lru_crawler_crawl(char *slabs, const enum crawler_run_t
}
}
- for (sid = POWER_SMALLEST; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
- if (tocrawl[sid])
- starts += do_lru_crawler_start(sid, settings.lru_crawler_tocrawl, type);
- }
- if (starts) {
- pthread_cond_signal(&lru_crawler_cond);
- pthread_mutex_unlock(&lru_crawler_lock);
+ starts = lru_crawler_start(tocrawl, settings.lru_crawler_tocrawl,
+ type, NULL, c, sfd);
+ if (starts == -1) {
+ return CRAWLER_RUNNING;
+ } else if (starts == -2) {
+ return CRAWLER_ERROR; /* FIXME: not very helpful. */
+ } else if (starts) {
return CRAWLER_OK;
} else {
- pthread_mutex_unlock(&lru_crawler_lock);
return CRAWLER_NOTSTARTED;
}
}
@@ -1768,12 +1908,14 @@ void lru_crawler_resume(void) {
int init_lru_crawler(void) {
if (lru_crawler_initialized == 0) {
- memset(&crawlerstats, 0, sizeof(crawlerstats_t) * MAX_NUMBER_OF_SLAB_CLASSES);
if (pthread_cond_init(&lru_crawler_cond, NULL) != 0) {
fprintf(stderr, "Can't initialize lru crawler condition\n");
return -1;
}
pthread_mutex_init(&lru_crawler_lock, NULL);
+ active_crawler_mod.c.c = NULL;
+ active_crawler_mod.mod = NULL;
+ active_crawler_mod.data = NULL;
lru_crawler_initialized = 1;
}
return 0;
diff --git a/items.h b/items.h
index 848f8c0..d9d99e3 100644
--- a/items.h
+++ b/items.h
@@ -35,7 +35,7 @@ void item_stats_reset(void);
extern pthread_mutex_t lru_locks[POWER_LARGEST];
enum crawler_result_type {
- CRAWLER_OK=0, CRAWLER_RUNNING, CRAWLER_BADCLASS, CRAWLER_NOTSTARTED
+ CRAWLER_OK=0, CRAWLER_RUNNING, CRAWLER_BADCLASS, CRAWLER_NOTSTARTED, CRAWLER_ERROR
};
int start_lru_maintainer_thread(void);
@@ -47,7 +47,6 @@ void lru_maintainer_resume(void);
int start_item_crawler_thread(void);
int stop_item_crawler_thread(void);
int init_lru_crawler(void);
-enum crawler_result_type lru_crawler_crawl(char *slabs, enum crawler_run_type);
-int lru_crawler_set_client(void *c, const int sfd); /* FIXME: Temporary. */
+enum crawler_result_type lru_crawler_crawl(char *slabs, enum crawler_run_type, void *c, const int sfd);
void lru_crawler_pause(void);
void lru_crawler_resume(void);
diff --git a/memcached.c b/memcached.c
index c090343..bc3636c 100644
--- a/memcached.c
+++ b/memcached.c
@@ -3976,7 +3976,7 @@ static void process_command(conn *c, char *command) {
return;
}
- rv = lru_crawler_crawl(tokens[2].value, CRAWLER_EXPIRED);
+ rv = lru_crawler_crawl(tokens[2].value, CRAWLER_EXPIRED, NULL, 0);
switch(rv) {
case CRAWLER_OK:
out_string(c, "OK");
@@ -3990,6 +3990,9 @@ static void process_command(conn *c, char *command) {
case CRAWLER_NOTSTARTED:
out_string(c, "NOTSTARTED no items to crawl");
break;
+ case CRAWLER_ERROR:
+ out_string(c, "ERROR an unknown error happened");
+ break;
}
return;
} else if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN + 1].value, "metadump") == 0) {
@@ -3998,9 +4001,8 @@ static void process_command(conn *c, char *command) {
return;
}
- // FIXME: check response code.
- lru_crawler_set_client(c, c->sfd);
- int rv = lru_crawler_crawl(tokens[2].value, CRAWLER_METADUMP);
+ int rv = lru_crawler_crawl(tokens[2].value, CRAWLER_METADUMP,
+ c, c->sfd);
switch(rv) {
case CRAWLER_OK:
out_string(c, "OK");
@@ -4017,6 +4019,9 @@ static void process_command(conn *c, char *command) {
case CRAWLER_NOTSTARTED:
out_string(c, "NOTSTARTED no items to crawl");
break;
+ case CRAWLER_ERROR:
+ out_string(c, "ERROR an unknown error happened");
+ break;
}
return;
} else if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN + 1].value, "tocrawl") == 0) {
diff --git a/memcached.h b/memcached.h
index dd1fd6c..359c122 100644
--- a/memcached.h
+++ b/memcached.h
@@ -419,8 +419,6 @@ typedef struct _stritem {
/* then data with terminating \r\n (no terminating null; it's binary!) */
} item;
-typedef void (*crawler_eval_func)(item *it, uint32_t hv, int slab_cls);
-
// TODO: If we eventually want user loaded modules, we can't use an enum :(
enum crawler_run_type {
CRAWLER_EXPIRED=0, CRAWLER_METADUMP
@@ -439,8 +437,6 @@ typedef struct {
uint8_t slabs_clsid;/* which slab class we're in */
uint8_t nkey; /* key length, w/terminating null and padding */
uint32_t remaining; /* Max keys to crawl per slab per invocation */
- enum crawler_run_type type; /* which module to use during run */
- crawler_eval_func eval; /* The function to run with the locked item */
} crawler;
/* Header when an item is actually a chunk of another item. */