summaryrefslogtreecommitdiff
path: root/crawler.c
diff options
context:
space:
mode:
Diffstat (limited to 'crawler.c')
-rw-r--r--crawler.c180
1 files changed, 103 insertions, 77 deletions
diff --git a/crawler.c b/crawler.c
index c8361b5..f333585 100644
--- a/crawler.c
+++ b/crawler.c
@@ -27,8 +27,9 @@
typedef struct {
void *c; /* original connection structure. still with source thread attached. */
int sfd; /* client fd. */
- bipbuf_t *buf; /* output buffer */
- char *cbuf; /* current buffer */
+ int buflen;
+ int bufused;
+ char *buf; /* output buffer */
} crawler_client_t;
typedef struct _crawler_module_t crawler_module_t;
@@ -86,7 +87,7 @@ crawler_module_reg_t *crawler_mod_regs[3] = {
&crawler_metadump_mod
};
-static int lru_crawler_client_getbuf(crawler_client_t *c);
+static int lru_crawler_write(crawler_client_t *c);
crawler_module_t active_crawler_mod;
enum crawler_run_type active_crawler_type;
@@ -107,14 +108,13 @@ static void *storage;
/*** LRU CRAWLER THREAD ***/
-#define LRU_CRAWLER_WRITEBUF 8192
+#define LRU_CRAWLER_MINBUFSPACE 8192
static void lru_crawler_close_client(crawler_client_t *c) {
//fprintf(stderr, "CRAWLER: Closing client\n");
sidethread_conn_close(c->c);
c->c = NULL;
- c->cbuf = NULL;
- bipbuf_free(c->buf);
+ free(c->buf);
c->buf = NULL;
}
@@ -122,11 +122,20 @@ static void lru_crawler_release_client(crawler_client_t *c) {
//fprintf(stderr, "CRAWLER: Closing client\n");
redispatch_conn(c->c);
c->c = NULL;
- c->cbuf = NULL;
- bipbuf_free(c->buf);
+ free(c->buf);
c->buf = NULL;
}
+static int lru_crawler_expand_buf(crawler_client_t *c) {
+ c->buflen *= 2;
+ char *nb = realloc(c->buf, c->buflen);
+ if (nb == NULL) {
+ return -1;
+ }
+ c->buf = nb;
+ return 0;
+}
+
static int crawler_expired_init(crawler_module_t *cm, void *data) {
struct crawler_expired_data *d;
if (data != NULL) {
@@ -236,7 +245,6 @@ static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv
}
static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, int i) {
- //int slab_id = CLEAR_LRU(i);
char keybuf[KEY_MAX_URI_ENCODED_LENGTH];
int is_flushed = item_is_flushed(it);
/* Ignore expired content. */
@@ -247,7 +255,7 @@ static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, i
}
// TODO: uriencode directly into the buffer.
uriencode(ITEM_key(it), keybuf, it->nkey, KEY_MAX_URI_ENCODED_LENGTH);
- int total = snprintf(cm->c.cbuf, 4096,
+ int total = snprintf(cm->c.buf + cm->c.bufused, 4096,
"key=%s exp=%ld la=%llu cas=%llu fetch=%s cls=%u size=%lu\n",
keybuf,
(it->exptime == 0) ? -1 : (long)(it->exptime + process_started),
@@ -257,53 +265,61 @@ static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, i
ITEM_clsid(it),
(unsigned long) ITEM_ntotal(it));
refcount_decr(it);
- // TODO: some way of tracking the errors. these are very unlikely though.
- if (total >= LRU_CRAWLER_WRITEBUF - 1 || total <= 0) {
- /* Failed to write, don't push it. */
+ // TODO: some way of tracking the errors. these should be impossible given
+ // the space requirements.
+ if (total >= LRU_CRAWLER_MINBUFSPACE - 1 || total <= 0) {
+ // Failed to write, don't push it.
return;
}
- bipbuf_push(cm->c.buf, total);
+ cm->c.bufused += total;
}
static void crawler_metadump_finalize(crawler_module_t *cm) {
if (cm->c.c != NULL) {
- // Ensure space for final message.
- lru_crawler_client_getbuf(&cm->c);
- memcpy(cm->c.cbuf, "END\r\n", 5);
- bipbuf_push(cm->c.buf, 5);
+ lru_crawler_write(&cm->c); // empty the write buffer
+ memcpy(cm->c.buf, "END\r\n", 5);
+ cm->c.bufused += 5;
}
}
-static int lru_crawler_poll(crawler_client_t *c) {
- unsigned char *data;
- unsigned int data_size = 0;
+// write the whole buffer out to the client socket.
+static int lru_crawler_write(crawler_client_t *c) {
+ unsigned int data_size = c->bufused;
+ unsigned int sent = 0;
struct pollfd to_poll[1];
to_poll[0].fd = c->sfd;
to_poll[0].events = POLLOUT;
- int ret = poll(to_poll, 1, 1000);
-
- if (ret < 0) {
- // fatal.
- return -1;
- }
+ if (c->c == NULL) return -1;
+ if (data_size == 0) return 0;
- if (ret == 0) return 0;
+ while (sent < data_size) {
+ int ret = poll(to_poll, 1, 1000);
- if (to_poll[0].revents & POLLIN) {
- char buf[1];
- int res = ((conn*)c->c)->read(c->c, buf, 1);
- if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
- lru_crawler_close_client(c);
+ if (ret < 0) {
+ // fatal.
return -1;
}
- }
- if ((data = bipbuf_peek_all(c->buf, &data_size)) != NULL) {
+
+ if (ret == 0) return 0;
+
+ // check if socket was closed on us.
+ if (to_poll[0].revents & POLLIN) {
+ char buf[1];
+ int res = ((conn*)c->c)->read(c->c, buf, 1);
+ if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
+ lru_crawler_close_client(c);
+ return -1;
+ }
+ }
+
if (to_poll[0].revents & (POLLHUP|POLLERR)) {
+ // got socket hangup.
lru_crawler_close_client(c);
return -1;
} else if (to_poll[0].revents & POLLOUT) {
- int total = ((conn*)c->c)->write(c->c, data, data_size);
+ // socket is writeable.
+ int total = ((conn*)c->c)->write(c->c, c->buf + sent, data_size - sent);
if (total == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
lru_crawler_close_client(c);
@@ -312,29 +328,14 @@ static int lru_crawler_poll(crawler_client_t *c) {
} else if (total == 0) {
lru_crawler_close_client(c);
return -1;
- } else {
- bipbuf_poll(c->buf, total);
}
+ sent += total;
}
- }
- return 0;
-}
+ } // while
-/* Grab some space to work with, if none exists, run the poll() loop and wait
- * for it to clear up or close.
- * Return NULL if closed.
- */
-static int lru_crawler_client_getbuf(crawler_client_t *c) {
- void *buf = NULL;
- if (c->c == NULL) return -1;
- /* not enough space. */
- while ((buf = bipbuf_request(c->buf, LRU_CRAWLER_WRITEBUF)) == NULL) {
- // TODO: max loops before closing.
- int ret = lru_crawler_poll(c);
- if (ret < 0) return ret;
- }
+ // write buffer now empty.
+ c->bufused = 0;
- c->cbuf = buf;
return 0;
}
@@ -349,22 +350,39 @@ static void lru_crawler_class_done(int i) {
active_crawler_mod.mod->doneclass(&active_crawler_mod, i);
}
+// ensure we build the buffer a little bit to cut down on poll/write syscalls.
+#define MIN_ITEMS_PER_WRITE 16
static void item_crawl_hash(void) {
// get iterator from assoc. can hang for a long time.
// - blocks hash expansion
void *iter = assoc_get_iterator();
int crawls_persleep = settings.crawls_persleep;
item *it = NULL;
+ int items = 0;
// loop while iterator returns something
// - iterator func handles bucket-walking
// - iterator returns with bucket locked.
while (assoc_iterate(iter, &it)) {
// if iterator returns true but no item, we're inbetween buckets and
- // can do sleep or cleanup work without holding a lock.
+ // can do cleanup work without holding an item lock.
if (it == NULL) {
+ if (active_crawler_mod.c.c != NULL) {
+ if (items > MIN_ITEMS_PER_WRITE) {
+ int ret = lru_crawler_write(&active_crawler_mod.c);
+ items = 0;
+ if (ret != 0) {
+ // fail out and finalize.
+ break;
+ }
+ }
+ } else if (active_crawler_mod.mod->needs_client) {
+ // fail out and finalize.
+ break;
+ }
+
// - sleep bits from orig loop
- if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) {
+ if (crawls_persleep <= 0 && settings.lru_crawler_sleep) {
pthread_mutex_unlock(&lru_crawler_lock);
usleep(settings.lru_crawler_sleep);
pthread_mutex_lock(&lru_crawler_lock);
@@ -377,27 +395,29 @@ static void item_crawl_hash(void) {
continue;
}
- /* Get memory from bipbuf, if client has no space, flush. */
- if (active_crawler_mod.c.c != NULL) {
- int ret = lru_crawler_client_getbuf(&active_crawler_mod.c);
- if (ret != 0) {
- // fail out and finalize.
- break;
- }
- } else if (active_crawler_mod.mod->needs_client) {
- // fail out and finalize.
- break;
- }
-
// double check that the item isn't in a transitional state.
if (refcount_incr(it) < 2) {
refcount_decr(it);
continue;
}
+ // We're presently holding an item lock, so we cannot flush the
+ // buffer to the network socket as the syscall is both slow and could
+ // hang waiting for POLLOUT. Instead we must expand the buffer.
+ if (active_crawler_mod.c.c != NULL) {
+ crawler_client_t *c = &active_crawler_mod.c;
+ if (c->buflen - c->bufused < LRU_CRAWLER_MINBUFSPACE) {
+ if (lru_crawler_expand_buf(c) != 0) {
+ // failed to expand buffer, stop.
+ break;
+ }
+ }
+ }
// FIXME: missing hv and i are fine for metadump eval, but not fine
// for expire eval.
active_crawler_mod.mod->eval(&active_crawler_mod, it, 0, 0);
+ crawls_persleep--;
+ items++;
}
// must finalize or we leave the hash table expansion blocked.
@@ -430,12 +450,14 @@ static void *item_crawler_thread(void *arg) {
continue;
}
- /* Get memory from bipbuf, if client has no space, flush. */
if (active_crawler_mod.c.c != NULL) {
- int ret = lru_crawler_client_getbuf(&active_crawler_mod.c);
- if (ret != 0) {
- lru_crawler_class_done(i);
- continue;
+ crawler_client_t *c = &active_crawler_mod.c;
+ if (c->buflen - c->bufused < LRU_CRAWLER_MINBUFSPACE) {
+ int ret = lru_crawler_write(c);
+ if (ret != 0) {
+ lru_crawler_class_done(i);
+ continue;
+ }
}
} else if (active_crawler_mod.mod->needs_client) {
lru_crawler_class_done(i);
@@ -500,8 +522,8 @@ static void *item_crawler_thread(void *arg) {
if (active_crawler_mod.mod != NULL) {
if (active_crawler_mod.mod->finalize != NULL)
active_crawler_mod.mod->finalize(&active_crawler_mod);
- while (active_crawler_mod.c.c != NULL && bipbuf_used(active_crawler_mod.c.buf)) {
- lru_crawler_poll(&active_crawler_mod.c);
+ while (active_crawler_mod.c.c != NULL && active_crawler_mod.c.bufused != 0) {
+ lru_crawler_write(&active_crawler_mod.c);
}
// Double checking in case the client closed during the poll
if (active_crawler_mod.c.c != NULL) {
@@ -626,10 +648,14 @@ static int lru_crawler_set_client(crawler_module_t *cm, void *c, const int sfd)
crawlc->c = c;
crawlc->sfd = sfd;
- crawlc->buf = bipbuf_new(1024 * 128);
+ size_t size = LRU_CRAWLER_MINBUFSPACE * 16;
+ crawlc->buf = malloc(size);
+
if (crawlc->buf == NULL) {
return -2;
}
+ crawlc->buflen = size;
+ crawlc->bufused = 0;
return 0;
}