diff options
Diffstat (limited to 'logger.c')
-rw-r--r-- | logger.c | 306 |
1 files changed, 170 insertions, 136 deletions
@@ -1,14 +1,26 @@ /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* need this to get IOV_MAX on some platforms. */ +#ifndef __need_IOV_MAX +#define __need_IOV_MAX +#endif #include <stdlib.h> #include <string.h> #include <errno.h> #include <poll.h> +#include <limits.h> #include "memcached.h" #include "bipbuffer.h" -#define LOGGER_DEBUG 0 +/* FreeBSD 4.x doesn't have IOV_MAX exposed. */ +#ifndef IOV_MAX +#if defined(__FreeBSD__) || defined(__APPLE__) || defined(__GNU__) +# define IOV_MAX 1024 +#endif +#endif + +#define LOGGER_DEBUG 1 #if LOGGER_DEBUG #define L_DEBUG(...) \ @@ -31,8 +43,6 @@ pthread_mutex_t logger_stack_lock = PTHREAD_MUTEX_INITIALIZER; pthread_key_t logger_key; bipbuf_t *logger_thread_buf = NULL; -logger_chunk *logger_thread_last_lc = NULL; -int logger_thread_lc_count = 0; #if !defined(HAVE_GCC_ATOMICS) && !defined(__sun) pthread_mutex_t logger_atomics_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -45,8 +55,8 @@ int watcher_count = 0; /* Should this go somewhere else? */ static const entry_details default_entries[] = { - [LOGGER_ASCII_CMD] = {LOGGER_TEXT_ENTRY, 512, "<%d %s"}, - [LOGGER_EVICTION] = {LOGGER_EVICTION_ENTRY, 512, "eviction: [key %s] [fetched: %s] [ttl: %d] [la: %d]"} + [LOGGER_ASCII_CMD] = {LOGGER_TEXT_ENTRY, 512, LOG_RAWCMDS, "<%d %s"}, + [LOGGER_EVICTION] = {LOGGER_EVICTION_ENTRY, 512, LOG_EVICTIONS, "eviction: [key %s] [fetched: %s] [ttl: %d] [la: %d]"} }; static void logger_poll_watchers(int force_poll); @@ -106,83 +116,37 @@ static void logger_link_q(logger *l) { return; }*/ -/* Centralized log chunk buffer. All worker threads dequeue into this buffer, - * which gets written out to subscribed watchers. - * The "latest" global chunk is tracked. - * New chunks get attached to each active watcher, increasing refcount. - * Watchers flush from this buffer (tracking w->flushed) until buffer is full. - * When full, a new buffer is created and chained after the old one (lc->next) - * Watchers then catch up, releasing ref's on old chunks as they complete - * flushing. - * When a chunk is fully flushed by all watchers, it's released back to the - * central bipbuf. - * When totally out of chunk space, the most-behind watchers are closed. - * - * This fiddling is done to avoid excess memcpy'ing. We write directly into - * the central bipbuf rather than copy into it, and multiple readers access - * it. - */ -static logger_chunk *logger_get_chunk(void) { - logger_chunk *lc = logger_thread_last_lc; - int x; - - if (lc && lc->filled == 0) - return lc; - - lc = (logger_chunk *) bipbuf_request(logger_thread_buf, (8 * 1024)); - if (lc == NULL) { - L_DEBUG("LOGGER: Chunk space full.\n"); - return NULL; - } - bipbuf_push(logger_thread_buf, (8 * 1024)); - L_DEBUG("LOGGER: Creating new chunk\n"); - memset(lc, 0, sizeof(logger_chunk)); - lc->size = (8 * 1024) - sizeof(logger_chunk); - - /* Each watcher gets a reference to this new chunk */ - for (x = 0; x < WATCHER_LIMIT; x++) { - logger_watcher *w = watchers[x]; - if (w == NULL) - continue; - - if (w->lc == NULL) { - /* Watcher doesn't already have a chunk. */ - w->lc = lc; - } - lc->refcount++; - w->chunks++; - } - - if (logger_thread_last_lc) { - assert(logger_thread_last_lc != lc); - logger_thread_last_lc->next = lc; - } - logger_thread_last_lc = lc; - logger_thread_lc_count++; - - return lc; -} - /* return FULLBUF to tell caller buffer is full. * caller will then flush and retry log parse */ -static enum logger_parse_entry_ret logger_parse_entry(logger_chunk *lc, logentry *e) { +static enum logger_parse_entry_ret logger_parse_entry(bipbuf_t *buf, logentry *e) { int total = 0; + /* "64" needs better definition, since we're extending the length of the + * logline a bit. + */ + int esize = sizeof(logentry) + e->size + 64; + logentry *newe = (logentry *) bipbuf_request(buf, esize); + if (newe == NULL) { + return LOGGER_PARSE_ENTRY_FULLBUF; + } + /* Not bothering copying the whole struct header. will probably come up + * with a different shorter struct for this. + */ + newe->watcher_flag = e->watcher_flag; switch (e->event) { case LOGGER_TEXT_ENTRY: case LOGGER_EVICTION_ENTRY: - total = snprintf(lc->data + lc->written, (lc->size - lc->written), "[%d.%d] [%llu] %s\n", + total = snprintf((char *) newe->data, esize, "[%d.%d] [%llu] %s\n", (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid, (char *) e->data); - if (total >= (lc->size - lc->written) || total <= 0) { - /* ran out of space. don't advance written, run a flush, - * retry write? - */ - L_DEBUG("LOGGER: Chunk filled\n"); - lc->filled = 1; - return LOGGER_PARSE_ENTRY_FULLBUF; + if (total >= esize || total <= 0) { + /* FIXME: This is now a much more fatal error. need to make it + * not crash though. */ + L_DEBUG("LOGGER: Failed to flatten log entry!\n"); + return LOGGER_PARSE_ENTRY_FAILED; } else { - lc->written += total; + newe->size = total + 1; + bipbuf_push(buf, sizeof(logentry) + newe->size); } break; } @@ -190,19 +154,6 @@ static enum logger_parse_entry_ret logger_parse_entry(logger_chunk *lc, logentry return LOGGER_PARSE_ENTRY_OK; } -static void logger_chunk_release(logger_chunk *lc) { - lc->refcount--; - if (lc->refcount == 0) { - L_DEBUG("LOGGER: Releasing logger chunk\n"); - if (lc->next == NULL) { - L_DEBUG("LOGGER: Clearing last LC chunk reference\n"); - logger_thread_last_lc = NULL; - } - bipbuf_poll(logger_thread_buf, (8 * 1024)); - logger_thread_lc_count--; - } -} - /* Called with logger stack locked. * Iterates over every watcher collecting enabled flags. */ @@ -231,13 +182,7 @@ static void logger_set_flags(void) { * processing. */ static void logger_close_watcher(logger_watcher *w) { - logger_chunk *lc = w->lc; L_DEBUG("LOGGER: Closing dead watcher\n"); - while (lc != NULL) { - logger_chunk *nlc = lc->next; - logger_chunk_release(lc); - lc = nlc; - } watchers[w->id] = NULL; sidethread_conn_close(w->c); watcher_count--; @@ -245,22 +190,56 @@ static void logger_close_watcher(logger_watcher *w) { logger_set_flags(); } +static void logger_push_central_buf(void) { + int x; + logger_watcher *w; + int min_flushed = INT_MAX; + /* If something is set into min_flushed, we're able to advance the central + * buffer forward by min_flushed bytes. + */ + for (x = 0; x < WATCHER_LIMIT; x++) { + if (watchers[x] == NULL) + continue; + w = watchers[x]; + if (w->min_flushed < min_flushed) + min_flushed = w->min_flushed; + } + if (min_flushed != 0) { + //L_DEBUG("LOGGER: min_flushed [%d], advancing central buffer\n", min_flushed); + for (x = 0; x < WATCHER_LIMIT; x++) { + if (watchers[x] == NULL) + continue; + assert(watchers[x]->flushed - min_flushed >= 0); + watchers[x]->flushed -= min_flushed; + watchers[x]->min_flushed -= min_flushed; + } + bipbuf_poll(logger_thread_buf, min_flushed); + } +} + /* Pick any number of "oldest" behind-watchers and kill them. */ -static void logger_kill_watchers(const int count) { +static void logger_kill_watchers(void) { int x; logger_watcher *w; + int min_flushed = INT_MAX; + int min_flushed_watcher = -1; for (x = 0; x < WATCHER_LIMIT; x++) { w = watchers[x]; if (w == NULL) continue; - if (w->lc && w->chunks == count) { - L_DEBUG("LOGGER: Killing watcher [%d] because it's too behind (%d)\n", - w->sfd, w->chunks); - logger_close_watcher(w); + if (w->min_flushed < min_flushed) { + min_flushed = w->min_flushed; + min_flushed_watcher = x; } } + if (min_flushed_watcher > -1) { + fprintf(stderr, "LOGGER: Killing watcher [%d] because of low flush bytes (%d)\n", + watchers[min_flushed_watcher]->sfd, min_flushed); + logger_close_watcher(watchers[min_flushed_watcher]); + logger_push_central_buf(); + } } /* Reads a particular worker thread's available bipbuf bytes. Parses each log @@ -272,8 +251,8 @@ static int logger_thread_read(logger *l) { unsigned int size; unsigned int pos = 0; unsigned char *data; + unsigned int was_full = 0; logentry *e; - logger_chunk *lc; pthread_mutex_lock(&l->mutex); data = bipbuf_peek_all(l->buf, &size); pthread_mutex_unlock(&l->mutex); @@ -281,27 +260,28 @@ static int logger_thread_read(logger *l) { if (data == NULL) { return 0; } - L_DEBUG("LOGGER: Got %d bytes from bipbuffer\n", size); + //L_DEBUG("LOGGER: Got %d bytes from bipbuffer\n", size); /* parse buffer */ while (pos < size && watcher_count > 0) { enum logger_parse_entry_ret ret; e = (logentry *) (data + pos); - lc = logger_get_chunk(); - if (lc == NULL) { - /* out of buffer space. show must go on, so kill something. */ - logger_kill_watchers(logger_thread_lc_count); - continue; - } - ret = logger_parse_entry(lc, e); + ret = logger_parse_entry(logger_thread_buf, e); if (ret == LOGGER_PARSE_ENTRY_FULLBUF) { /* Buffer was full. Push the last up, force an early flush. */ - logger_poll_watchers(0); + if (was_full) { + /* out of buffer space. show must go on, so kill something. */ + logger_kill_watchers(); + } else { + logger_poll_watchers(0); + was_full = 1; + } continue; } else if (ret != LOGGER_PARSE_ENTRY_OK) { fprintf(stderr, "LOGGER: Failed to parse log entry\n"); abort(); } + was_full = 0; pos += sizeof(logentry) + e->size; } assert(pos <= size); @@ -342,24 +322,20 @@ static void logger_poll_watchers(int force_poll) { int x; int nfd = 0; logger_watcher *w; + unsigned char *data; + unsigned int data_size = 0; + struct iovec iov[IOV_MAX]; + int iovcnt = 0; + data = bipbuf_peek_all(logger_thread_buf, &data_size); + if (data == NULL && force_poll == 0) + return; for (x = 0; x < WATCHER_LIMIT; x++) { w = watchers[x]; if (w == NULL) continue; - /* If this chunk has been flushed, release it and start the next one. - * Next chunk may be null, which stops writing. */ - if (w->lc != NULL && w->flushed >= w->lc->written && w->lc->filled) { - logger_chunk *nlc = w->lc->next; - logger_chunk_release(w->lc); - /* More buffers to eat */ - w->lc = nlc; - w->chunks--; - w->flushed = 0; - } - - if (w->lc != NULL && w->lc->written > w->flushed) { + if (data_size > w->flushed) { watchers_pollfds[nfd].fd = w->sfd; watchers_pollfds[nfd].events = POLLOUT; nfd++; @@ -370,10 +346,14 @@ static void logger_poll_watchers(int force_poll) { } } + /* FIXME: Verify if this is necessary. */ + if (data != NULL && force_poll == 0) + assert(nfd != 0); + if (nfd == 0) return; - L_DEBUG("LOGGER: calling poll()\n"); + L_DEBUG("LOGGER: calling poll() [data_size: %d]\n", data_size); int ret = poll(watchers_pollfds, nfd, 0); if (ret < 0) { @@ -386,7 +366,6 @@ static void logger_poll_watchers(int force_poll) { w = watchers[x]; if (w == NULL) continue; - /* Early detection of a disconnect. Otherwise we have to wait until * the next write */ @@ -399,40 +378,96 @@ static void logger_poll_watchers(int force_poll) { continue; } } - if (w->lc != NULL) { + if (data_size > w->flushed) { if (watchers_pollfds[nfd].revents & (POLLHUP|POLLERR)) { L_DEBUG("LOGGER: watcher closed during poll() call\n"); logger_close_watcher(w); } else if (watchers_pollfds[nfd].revents & POLLOUT) { int total = 0; + /* To account for a partial write into the iovec, pos should + * loop and skip logentry's until the next one would be + * larger, then set the iov_base as an offset into the + * remainder. + */ + unsigned int pos = w->flushed; + iovcnt = 0; + while (pos < data_size) { + logentry *e = (logentry *) (data + pos); + int esize = sizeof(logentry) + e->size; + int flushed = 0; + if (pos + esize < w->flushed) { + pos += esize; + continue; + } else if (pos < w->flushed) { + /* We have a remainder. */ + flushed = w->flushed - (pos + sizeof(logentry)); + assert(pos + flushed <= w->flushed); + } else if ((e->watcher_flag & w->eflags) == 0) { + /* If we're ahead of flushed but we aren't listening, + * we advance flushed and skip this event + */ + //L_DEBUG("LOGGER: Skipped an event for [%d] (eflags: %d) (watcher_flag: %d)\n", + // w->sfd, w->eflags, e->watcher_flag); + pos += esize; + w->flushed += esize; + w->min_flushed += esize; + continue; + } + iov[iovcnt].iov_base = (e->data + flushed); + iov[iovcnt].iov_len = e->size - flushed; + //L_DEBUG("LOGGER: incrementing pos [%d] by %d [esize: %d]\n", pos, esize, e->size); + pos += esize; + iovcnt++; + } + + if (iovcnt == 0) { + /* Turns out we skipped all of the events! */ + continue; + } + /* We can write a bit. */ switch (w->t) { case LOGGER_WATCHER_STDERR: - total = fwrite(w->lc->data + w->flushed, 1, w->lc->written - w->flushed, stderr); + total = writev(STDERR_FILENO, iov, iovcnt); break; case LOGGER_WATCHER_CLIENT: - total = write(w->sfd, w->lc->data + w->flushed, w->lc->written - w->flushed); + total = writev(w->sfd, iov, iovcnt); break; } - L_DEBUG("LOGGER: poll() wrote %d to %d (written: %d) (flushed: %d)\n", total, w->sfd, - w->lc->written, w->flushed); + L_DEBUG("LOGGER: poll() wrote %d to %d (data_size: %d) (flushed: %d)\n", total, w->sfd, + data_size, w->flushed); if (total == -1) { if (errno != EAGAIN && errno != EWOULDBLOCK) { logger_close_watcher(w); } L_DEBUG("LOGGER: watcher hit EAGAIN\n"); - nfd++; - continue; } else if (total == 0) { logger_close_watcher(w); + } else { + int rem = total; + int x; + for (x = 0; x < iovcnt; x++) { + rem -= iov[x].iov_len; + if (rem >= 0) { + w->flushed += iov[x].iov_len + sizeof(logentry); + } else { + L_DEBUG("LOGGER: remainder %d\n", rem); + /* We have a remainder. do count logentry + * struct at this time. */ + rem = abs(rem) + sizeof(logentry); + w->flushed += rem; + break; + } + } + w->min_flushed = w->flushed - rem; } - - w->flushed += total; } nfd++; } } + + logger_push_central_buf(); } #define MAX_LOGGER_SLEEP 100000 @@ -571,6 +606,7 @@ enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, cons } e->gid = gid; e->event = d->subtype; + e->watcher_flag = d->watcher_flag; SET_LOGGER_TIME(); switch (d->subtype) { @@ -622,6 +658,7 @@ enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, cons enum logger_add_watcher_ret logger_add_watcher(void *c, const int sfd, uint16_t f) { int x; logger_watcher *w = NULL; + unsigned int size = 0; pthread_mutex_lock(&logger_stack_lock); if (watcher_count >= WATCHER_LIMIT) { return LOGGER_ADD_WATCHER_TOO_MANY; @@ -644,13 +681,10 @@ enum logger_add_watcher_ret logger_add_watcher(void *c, const int sfd, uint16_t } w->id = x; w->eflags = f; - /* Attach to an existing log chunk if there is one */ - if (logger_thread_last_lc && !logger_thread_last_lc->filled) { - logger_chunk *lc = logger_thread_last_lc; - lc->refcount++; - w->lc = lc; - w->flushed = lc->written; /* only write logs after we attach */ - w->chunks++; + /* Skip any currently queued data, so we only print new lines. */ + if (bipbuf_peek_all(logger_thread_buf, &size) != NULL) { + w->flushed = size; + w->min_flushed = size; } watchers[x] = w; watcher_count++; |