diff options
-rw-r--r-- | crawler.c | 180 |
1 files changed, 103 insertions, 77 deletions
@@ -27,8 +27,9 @@ typedef struct { void *c; /* original connection structure. still with source thread attached. */ int sfd; /* client fd. */ - bipbuf_t *buf; /* output buffer */ - char *cbuf; /* current buffer */ + int buflen; + int bufused; + char *buf; /* output buffer */ } crawler_client_t; typedef struct _crawler_module_t crawler_module_t; @@ -86,7 +87,7 @@ crawler_module_reg_t *crawler_mod_regs[3] = { &crawler_metadump_mod }; -static int lru_crawler_client_getbuf(crawler_client_t *c); +static int lru_crawler_write(crawler_client_t *c); crawler_module_t active_crawler_mod; enum crawler_run_type active_crawler_type; @@ -107,14 +108,13 @@ static void *storage; /*** LRU CRAWLER THREAD ***/ -#define LRU_CRAWLER_WRITEBUF 8192 +#define LRU_CRAWLER_MINBUFSPACE 8192 static void lru_crawler_close_client(crawler_client_t *c) { //fprintf(stderr, "CRAWLER: Closing client\n"); sidethread_conn_close(c->c); c->c = NULL; - c->cbuf = NULL; - bipbuf_free(c->buf); + free(c->buf); c->buf = NULL; } @@ -122,11 +122,20 @@ static void lru_crawler_release_client(crawler_client_t *c) { //fprintf(stderr, "CRAWLER: Closing client\n"); redispatch_conn(c->c); c->c = NULL; - c->cbuf = NULL; - bipbuf_free(c->buf); + free(c->buf); c->buf = NULL; } +static int lru_crawler_expand_buf(crawler_client_t *c) { + c->buflen *= 2; + char *nb = realloc(c->buf, c->buflen); + if (nb == NULL) { + return -1; + } + c->buf = nb; + return 0; +} + static int crawler_expired_init(crawler_module_t *cm, void *data) { struct crawler_expired_data *d; if (data != NULL) { @@ -236,7 +245,6 @@ static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv } static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, int i) { - //int slab_id = CLEAR_LRU(i); char keybuf[KEY_MAX_URI_ENCODED_LENGTH]; int is_flushed = item_is_flushed(it); /* Ignore expired content. */ @@ -247,7 +255,7 @@ static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, i } // TODO: uriencode directly into the buffer. uriencode(ITEM_key(it), keybuf, it->nkey, KEY_MAX_URI_ENCODED_LENGTH); - int total = snprintf(cm->c.cbuf, 4096, + int total = snprintf(cm->c.buf + cm->c.bufused, 4096, "key=%s exp=%ld la=%llu cas=%llu fetch=%s cls=%u size=%lu\n", keybuf, (it->exptime == 0) ? -1 : (long)(it->exptime + process_started), @@ -257,53 +265,61 @@ static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, i ITEM_clsid(it), (unsigned long) ITEM_ntotal(it)); refcount_decr(it); - // TODO: some way of tracking the errors. these are very unlikely though. - if (total >= LRU_CRAWLER_WRITEBUF - 1 || total <= 0) { - /* Failed to write, don't push it. */ + // TODO: some way of tracking the errors. these should be impossible given + // the space requirements. + if (total >= LRU_CRAWLER_MINBUFSPACE - 1 || total <= 0) { + // Failed to write, don't push it. return; } - bipbuf_push(cm->c.buf, total); + cm->c.bufused += total; } static void crawler_metadump_finalize(crawler_module_t *cm) { if (cm->c.c != NULL) { - // Ensure space for final message. - lru_crawler_client_getbuf(&cm->c); - memcpy(cm->c.cbuf, "END\r\n", 5); - bipbuf_push(cm->c.buf, 5); + lru_crawler_write(&cm->c); // empty the write buffer + memcpy(cm->c.buf, "END\r\n", 5); + cm->c.bufused += 5; } } -static int lru_crawler_poll(crawler_client_t *c) { - unsigned char *data; - unsigned int data_size = 0; +// write the whole buffer out to the client socket. +static int lru_crawler_write(crawler_client_t *c) { + unsigned int data_size = c->bufused; + unsigned int sent = 0; struct pollfd to_poll[1]; to_poll[0].fd = c->sfd; to_poll[0].events = POLLOUT; - int ret = poll(to_poll, 1, 1000); - - if (ret < 0) { - // fatal. - return -1; - } + if (c->c == NULL) return -1; + if (data_size == 0) return 0; - if (ret == 0) return 0; + while (sent < data_size) { + int ret = poll(to_poll, 1, 1000); - if (to_poll[0].revents & POLLIN) { - char buf[1]; - int res = ((conn*)c->c)->read(c->c, buf, 1); - if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) { - lru_crawler_close_client(c); + if (ret < 0) { + // fatal. return -1; } - } - if ((data = bipbuf_peek_all(c->buf, &data_size)) != NULL) { + + if (ret == 0) return 0; + + // check if socket was closed on us. + if (to_poll[0].revents & POLLIN) { + char buf[1]; + int res = ((conn*)c->c)->read(c->c, buf, 1); + if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) { + lru_crawler_close_client(c); + return -1; + } + } + if (to_poll[0].revents & (POLLHUP|POLLERR)) { + // got socket hangup. lru_crawler_close_client(c); return -1; } else if (to_poll[0].revents & POLLOUT) { - int total = ((conn*)c->c)->write(c->c, data, data_size); + // socket is writeable. + int total = ((conn*)c->c)->write(c->c, c->buf + sent, data_size - sent); if (total == -1) { if (errno != EAGAIN && errno != EWOULDBLOCK) { lru_crawler_close_client(c); @@ -312,29 +328,14 @@ static int lru_crawler_poll(crawler_client_t *c) { } else if (total == 0) { lru_crawler_close_client(c); return -1; - } else { - bipbuf_poll(c->buf, total); } + sent += total; } - } - return 0; -} + } // while -/* Grab some space to work with, if none exists, run the poll() loop and wait - * for it to clear up or close. - * Return NULL if closed. - */ -static int lru_crawler_client_getbuf(crawler_client_t *c) { - void *buf = NULL; - if (c->c == NULL) return -1; - /* not enough space. */ - while ((buf = bipbuf_request(c->buf, LRU_CRAWLER_WRITEBUF)) == NULL) { - // TODO: max loops before closing. - int ret = lru_crawler_poll(c); - if (ret < 0) return ret; - } + // write buffer now empty. + c->bufused = 0; - c->cbuf = buf; return 0; } @@ -349,22 +350,39 @@ static void lru_crawler_class_done(int i) { active_crawler_mod.mod->doneclass(&active_crawler_mod, i); } +// ensure we build the buffer a little bit to cut down on poll/write syscalls. +#define MIN_ITEMS_PER_WRITE 16 static void item_crawl_hash(void) { // get iterator from assoc. can hang for a long time. // - blocks hash expansion void *iter = assoc_get_iterator(); int crawls_persleep = settings.crawls_persleep; item *it = NULL; + int items = 0; // loop while iterator returns something // - iterator func handles bucket-walking // - iterator returns with bucket locked. while (assoc_iterate(iter, &it)) { // if iterator returns true but no item, we're inbetween buckets and - // can do sleep or cleanup work without holding a lock. + // can do cleanup work without holding an item lock. if (it == NULL) { + if (active_crawler_mod.c.c != NULL) { + if (items > MIN_ITEMS_PER_WRITE) { + int ret = lru_crawler_write(&active_crawler_mod.c); + items = 0; + if (ret != 0) { + // fail out and finalize. + break; + } + } + } else if (active_crawler_mod.mod->needs_client) { + // fail out and finalize. + break; + } + // - sleep bits from orig loop - if (crawls_persleep-- <= 0 && settings.lru_crawler_sleep) { + if (crawls_persleep <= 0 && settings.lru_crawler_sleep) { pthread_mutex_unlock(&lru_crawler_lock); usleep(settings.lru_crawler_sleep); pthread_mutex_lock(&lru_crawler_lock); @@ -377,27 +395,29 @@ static void item_crawl_hash(void) { continue; } - /* Get memory from bipbuf, if client has no space, flush. */ - if (active_crawler_mod.c.c != NULL) { - int ret = lru_crawler_client_getbuf(&active_crawler_mod.c); - if (ret != 0) { - // fail out and finalize. - break; - } - } else if (active_crawler_mod.mod->needs_client) { - // fail out and finalize. - break; - } - // double check that the item isn't in a transitional state. if (refcount_incr(it) < 2) { refcount_decr(it); continue; } + // We're presently holding an item lock, so we cannot flush the + // buffer to the network socket as the syscall is both slow and could + // hang waiting for POLLOUT. Instead we must expand the buffer. + if (active_crawler_mod.c.c != NULL) { + crawler_client_t *c = &active_crawler_mod.c; + if (c->buflen - c->bufused < LRU_CRAWLER_MINBUFSPACE) { + if (lru_crawler_expand_buf(c) != 0) { + // failed to expand buffer, stop. + break; + } + } + } // FIXME: missing hv and i are fine for metadump eval, but not fine // for expire eval. active_crawler_mod.mod->eval(&active_crawler_mod, it, 0, 0); + crawls_persleep--; + items++; } // must finalize or we leave the hash table expansion blocked. @@ -430,12 +450,14 @@ static void *item_crawler_thread(void *arg) { continue; } - /* Get memory from bipbuf, if client has no space, flush. */ if (active_crawler_mod.c.c != NULL) { - int ret = lru_crawler_client_getbuf(&active_crawler_mod.c); - if (ret != 0) { - lru_crawler_class_done(i); - continue; + crawler_client_t *c = &active_crawler_mod.c; + if (c->buflen - c->bufused < LRU_CRAWLER_MINBUFSPACE) { + int ret = lru_crawler_write(c); + if (ret != 0) { + lru_crawler_class_done(i); + continue; + } } } else if (active_crawler_mod.mod->needs_client) { lru_crawler_class_done(i); @@ -500,8 +522,8 @@ static void *item_crawler_thread(void *arg) { if (active_crawler_mod.mod != NULL) { if (active_crawler_mod.mod->finalize != NULL) active_crawler_mod.mod->finalize(&active_crawler_mod); - while (active_crawler_mod.c.c != NULL && bipbuf_used(active_crawler_mod.c.buf)) { - lru_crawler_poll(&active_crawler_mod.c); + while (active_crawler_mod.c.c != NULL && active_crawler_mod.c.bufused != 0) { + lru_crawler_write(&active_crawler_mod.c); } // Double checking in case the client closed during the poll if (active_crawler_mod.c.c != NULL) { @@ -626,10 +648,14 @@ static int lru_crawler_set_client(crawler_module_t *cm, void *c, const int sfd) crawlc->c = c; crawlc->sfd = sfd; - crawlc->buf = bipbuf_new(1024 * 128); + size_t size = LRU_CRAWLER_MINBUFSPACE * 16; + crawlc->buf = malloc(size); + if (crawlc->buf == NULL) { return -2; } + crawlc->buflen = size; + crawlc->bufused = 0; return 0; } |