summaryrefslogtreecommitdiff
path: root/crawler.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-02-18 16:20:56 -0800
committerdormando <dormando@rydia.net>2023-02-27 09:22:39 -0800
commita22be2bdbff1bb80be87071aad1caf648d15722b (patch)
treeec22205589b6a5a2190c89ecefdc1025a314863a /crawler.c
parentf012c9cb3516933b0fce54e12d358e697c24863d (diff)
downloadmemcached-a22be2bdbff1bb80be87071aad1caf648d15722b.tar.gz
crawler: don't hold lock while writing to network
LRU crawler in per-LRU mode only flushes the write buffer to the client socket while not holding important locks. The hash table iterator version of the crawler was accidentally holding an item lock while flushing to the network. Item locks must NOT be held for long periods of time as they will cause the daemon to lag. Originally the code used a circular buffer for writing to the network; this allowed it to easily do partial write flushes to the socket and continue filling the other half of the buffer. Fixing this requires the buffer be resizeable, so we instead use a straight buffer allocation. The write buffer must be large enough to handle all items within a hash table bucket. Hash table buckets are _supposed_ to max out at an average depth of 1.5 items, so in theory it should never resize. However it's possible to go higher if a user clamps the hash table size. There could also be larger than average buckets naturally due to the hash algorithm and luck.
Diffstat (limited to 'crawler.c')
-rw-r--r--crawler.c180
1 files changed, 103 insertions, 77 deletions
diff --git a/crawler.c b/crawler.c
index c8361b5..f333585 100644
--- a/crawler.c
+++ b/crawler.c
@@ -27,8 +27,9 @@
typedef struct {
void *c; /* original connection structure. still with source thread attached. */
int sfd; /* client fd. */
- bipbuf_t *buf; /* output buffer */
- char *cbuf; /* current buffer */
+ int buflen;
+ int bufused;
+ char *buf; /* output buffer */
} crawler_client_t;
typedef struct _crawler_module_t crawler_module_t;
@@ -86,7 +87,7 @@ crawler_module_reg_t *crawler_mod_regs[3] = {
&crawler_metadump_mod
};
-static int lru_crawler_client_getbuf(crawler_client_t *c);
+static int lru_crawler_write(crawler_client_t *c);
crawler_module_t active_crawler_mod;
enum crawler_run_type active_crawler_type;
@@ -107,14 +108,13 @@ static void *storage;
/*** LRU CRAWLER THREAD ***/
-#define LRU_CRAWLER_WRITEBUF 8192
+#define LRU_CRAWLER_MINBUFSPACE 8192
static void lru_crawler_close_client(crawler_client_t *c) {
//fprintf(stderr, "CRAWLER: Closing client\n");
sidethread_conn_close(c->c);
c->c = NULL;
- c->cbuf = NULL;
- bipbuf_free(c->buf);
+ free(c->buf);
c->buf = NULL;
}
@@ -122,11 +122,20 @@ static void lru_crawler_release_client(crawler_client_t *c) {
//fprintf(stderr, "CRAWLER: Closing client\n");
redispatch_conn(c->c);
c->c = NULL;
- c->cbuf = NULL;
- bipbuf_free(c->buf);
+ free(c->buf);
c->buf = NULL;
}
+static int lru_crawler_expand_buf(crawler_client_t *c) {
+ c->buflen *= 2;
+ char *nb = realloc(c->buf, c->buflen);
+ if (nb == NULL) {
+ return -1;
+ }
+ c->buf = nb;
+ return 0;
+}
+
static int crawler_expired_init(crawler_module_t *cm, void *data) {
struct crawler_expired_data *d;
if (data != NULL) {
@@ -236,7 +245,6 @@ static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv
}
static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, int i) {
- //int slab_id = CLEAR_LRU(i);
char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
int is_flushed = item_is_flushed(it);
/* Ignore expired content. */
@@ -247,7 +255,7 @@ static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, i
}
// TODO: uriencode directly into the buffer.
uriencode(ITEM_key(it), keybuf, it->nkey, KEY_MAX_URI_ENCODED_LENGTH);
- int total = snprintf(cm->c.cbuf, 4096,
+ int total = snprintf(cm->c.buf + cm->c.bufused, 4096,
"key=%s exp=%ld la=%llu cas=%llu fetch=%s cls=%u size=%lu\n",
keybuf,
(it->exptime == 0) ? -1 : (long)(it->exptime + process_started),
@@ -257,53 +265,61 @@ static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, i
ITEM_clsid(it),
(unsigned long) ITEM_ntotal(it));
refcount_decr(it);
- // TODO: some way of tracking the errors. these are very unlikely though.
- if (total >= LRU_CRAWLER_WRITEBUF - 1 || total <= 0) {
- /* Failed to write, don't push it. */
+ // TODO: some way of tracking the errors. these should be impossible given
+ // the space requirements.
+ if (total >= LRU_CRAWLER_MINBUFSPACE - 1 || total <= 0) {
+ // Failed to write, don't push it.
return;
}
- bipbuf_push(cm->c.buf, total);
+ cm->c.bufused += total;
}
static void crawler_metadump_finalize(crawler_module_t *cm) {
if (cm->c.c != NULL) {
- // Ensure space for final message.
- lru_crawler_client_getbuf(&cm->c);
- memcpy(cm->c.cbuf, "END\r\n", 5);
- bipbuf_push(cm->c.buf, 5);
+ lru_crawler_write(&cm->c); // empty the write buffer
+ memcpy(cm->c.buf, "END\r\n", 5);
+ cm->c.bufused += 5;
}
}
-static int lru_crawler_poll(crawler_client_t *c) {
- unsigned char *data;
- unsigned int data_size = 0;
+// write the whole buffer out to the client socket.
+static int lru_crawler_write(crawler_client_t *c) {
+ unsigned int data_size = c->bufused;
+ unsigned int sent = 0;
struct pollfd to_poll[1];
to_poll[0].fd = c->sfd;
to_poll[0].events = POLLOUT;
- int ret = poll(to_poll, 1, 1000);
-
- if (ret < 0) {
- // fatal.
- return -1;
- }
+ if (c->c == NULL) return -1;
+ if (data_size == 0) return 0;
- if (ret == 0) return 0;
+ while (sent < data_size) {
+ int ret = poll(to_poll, 1, 1000);
- if (to_poll[0].revents & POLLIN) {
- char buf[1];
- int res = ((conn*)c->c)->read(c->c, buf, 1);
- if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
- lru_crawler_close_client(c);
+ if (ret < 0) {
+ // fatal.
return -1;
}
- }
- if ((data = bipbuf_peek_all(c->buf, &data_size)) != NULL) {
+
+ if (ret == 0) return 0;
+
+ // check if socket was closed on us.
+ if (to_poll[0].revents & POLLIN) {
+ char buf[1];
+ int res = ((conn*)c->c)->read(c->c, buf, 1);
+ if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
+ lru_crawler_close_client(c);
+ return -1;
+ }
+ }
+
if (to_poll[0].revents & (POLLHUP|POLLERR)) {
+ // got socket hangup.
lru_crawler_close_client(c);
return -1;
} else if (to_poll[0].revents & POLLOUT) {
- int total = ((conn*)c->c)->write(c->c, data, data_size);
+ // socket is writeable.
+ int total = ((conn*)c->c)->write(c->c, c->buf + sent, data_size - sent);
if (total == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
lru_crawler_close_client(c);
@@ -312,29 +328,14 @@ static int lru_crawler_poll(crawler_client_t *c) {
} else if (total == 0) {
lru_crawler_close_client(c);
return -1;
- } else {
- bipbuf_poll(c->buf, total);
}
+ sent += total;
}
- }
- return 0;
-}
+ } // while
-/* Grab some space to work with, if none exists, run the poll() loop and wait
- * for it to clear up or close.
- * Return NULL if closed.
- */
-static int lru_crawler_client_getbuf(crawler_client_t *c) {
- void *buf = NULL;
- if (c->c == NULL) return -1;
- /* not enough space. */
- while ((buf = bipbuf_request(c->buf, LRU_CRAWLER_WRITEBUF)) == NULL) {
- // TODO: max loops before closing.
- int ret = lru_crawler_poll(c);
- if (ret < 0) return ret;
- }
+ // write buffer now empty.
+ c->bufused = 0;
- c->cbuf = buf;
return 0;
}
@@ -349,22 +350,39 @@ static void lru_crawler_class_done(int i) {
active_crawler_mod.mod->doneclass(&active_crawler_mod, i);
}
+// ensure we build the buffer a little bit to cut down on poll/write syscalls.
+#define MIN_ITEMS_PER_WRITE 16
static void item_crawl_hash(void) {
// get iterator from assoc. can hang for a long time.
// - blocks hash expansion
void *iter = assoc_get_iterator();
int crawls_persleep = settings.crawls_persleep;
item *it = NULL;
+ int items = 0;
// loop while iterator returns something
// - iterator func handles bucket-walking
// - iterator returns with bucket locked.
while (assoc_iterate(iter, &it)) {
// if iterator returns true but no item, we're inbetween buckets and
- // can do sleep or cleanup work without holding a lock.
+ // can do cleanup work without holding an item lock.
if (it == NULL) {
+ if (active_crawler_mod.c.c != NULL) {
+ if (items > MIN_ITEMS_PER_WRITE) {
+ int ret = lru_crawler_write(&active_crawler_mod.c);
+ items = 0;
+ if (ret != 0) {
+ // fail out and finalize.
+ break;
+ }
+ }
+ } else if (active_crawler_mod.mod->needs_client) {
+ // fail out and finalize.
+ break;
+ }
+
// - sleep bits from orig loop
- if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) {
+ if (crawls_persleep <= 0 && settings.lru_crawler_sleep) {
pthread_mutex_unlock(&lru_crawler_lock);
usleep(settings.lru_crawler_sleep);
pthread_mutex_lock(&lru_crawler_lock);
@@ -377,27 +395,29 @@ static void item_crawl_hash(void) {
continue;
}
- /* Get memory from bipbuf, if client has no space, flush. */
- if (active_crawler_mod.c.c != NULL) {
- int ret = lru_crawler_client_getbuf(&active_crawler_mod.c);
- if (ret != 0) {
- // fail out and finalize.
- break;
- }
- } else if (active_crawler_mod.mod->needs_client) {
- // fail out and finalize.
- break;
- }
-
// double check that the item isn't in a transitional state.
if (refcount_incr(it) < 2) {
refcount_decr(it);
continue;
}
+ // We're presently holding an item lock, so we cannot flush the
+ // buffer to the network socket as the syscall is both slow and could
+ // hang waiting for POLLOUT. Instead we must expand the buffer.
+ if (active_crawler_mod.c.c != NULL) {
+ crawler_client_t *c = &active_crawler_mod.c;
+ if (c->buflen - c->bufused < LRU_CRAWLER_MINBUFSPACE) {
+ if (lru_crawler_expand_buf(c) != 0) {
+ // failed to expand buffer, stop.
+ break;
+ }
+ }
+ }
// FIXME: missing hv and i are fine for metadump eval, but not fine
// for expire eval.
active_crawler_mod.mod->eval(&active_crawler_mod, it, 0, 0);
+ crawls_persleep--;
+ items++;
}
// must finalize or we leave the hash table expansion blocked.
@@ -430,12 +450,14 @@ static void *item_crawler_thread(void *arg) {
continue;
}
- /* Get memory from bipbuf, if client has no space, flush. */
if (active_crawler_mod.c.c != NULL) {
- int ret = lru_crawler_client_getbuf(&active_crawler_mod.c);
- if (ret != 0) {
- lru_crawler_class_done(i);
- continue;
+ crawler_client_t *c = &active_crawler_mod.c;
+ if (c->buflen - c->bufused < LRU_CRAWLER_MINBUFSPACE) {
+ int ret = lru_crawler_write(c);
+ if (ret != 0) {
+ lru_crawler_class_done(i);
+ continue;
+ }
}
} else if (active_crawler_mod.mod->needs_client) {
lru_crawler_class_done(i);
@@ -500,8 +522,8 @@ static void *item_crawler_thread(void *arg) {
if (active_crawler_mod.mod != NULL) {
if (active_crawler_mod.mod->finalize != NULL)
active_crawler_mod.mod->finalize(&active_crawler_mod);
- while (active_crawler_mod.c.c != NULL && bipbuf_used(active_crawler_mod.c.buf)) {
- lru_crawler_poll(&active_crawler_mod.c);
+ while (active_crawler_mod.c.c != NULL && active_crawler_mod.c.bufused != 0) {
+ lru_crawler_write(&active_crawler_mod.c);
}
// Double checking in case the client closed during the poll
if (active_crawler_mod.c.c != NULL) {
@@ -626,10 +648,14 @@ static int lru_crawler_set_client(crawler_module_t *cm, void *c, const int sfd)
crawlc->c = c;
crawlc->sfd = sfd;
- crawlc->buf = bipbuf_new(1024 * 128);
+ size_t size = LRU_CRAWLER_MINBUFSPACE * 16;
+ crawlc->buf = malloc(size);
+
if (crawlc->buf == NULL) {
return -2;
}
+ crawlc->buflen = size;
+ crawlc->bufused = 0;
return 0;
}