summaryrefslogtreecommitdiff
path: root/logger.c
diff options
context:
space:
mode:
Diffstat (limited to 'logger.c')
-rw-r--r--logger.c306
1 files changed, 170 insertions, 136 deletions
diff --git a/logger.c b/logger.c
index 49f2383..3180e30 100644
--- a/logger.c
+++ b/logger.c
@@ -1,14 +1,26 @@
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/* need this to get IOV_MAX on some platforms. */
+#ifndef __need_IOV_MAX
+#define __need_IOV_MAX
+#endif
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <poll.h>
+#include <limits.h>
#include "memcached.h"
#include "bipbuffer.h"
-#define LOGGER_DEBUG 0
+/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
+#ifndef IOV_MAX
+#if defined(__FreeBSD__) || defined(__APPLE__) || defined(__GNU__)
+# define IOV_MAX 1024
+#endif
+#endif
+
+#define LOGGER_DEBUG 1
#if LOGGER_DEBUG
#define L_DEBUG(...) \
@@ -31,8 +43,6 @@ pthread_mutex_t logger_stack_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_key_t logger_key;
bipbuf_t *logger_thread_buf = NULL;
-logger_chunk *logger_thread_last_lc = NULL;
-int logger_thread_lc_count = 0;
#if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
pthread_mutex_t logger_atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -45,8 +55,8 @@ int watcher_count = 0;
/* Should this go somewhere else? */
static const entry_details default_entries[] = {
- [LOGGER_ASCII_CMD] = {LOGGER_TEXT_ENTRY, 512, "<%d %s"},
- [LOGGER_EVICTION] = {LOGGER_EVICTION_ENTRY, 512, "eviction: [key %s] [fetched: %s] [ttl: %d] [la: %d]"}
+ [LOGGER_ASCII_CMD] = {LOGGER_TEXT_ENTRY, 512, LOG_RAWCMDS, "<%d %s"},
+ [LOGGER_EVICTION] = {LOGGER_EVICTION_ENTRY, 512, LOG_EVICTIONS, "eviction: [key %s] [fetched: %s] [ttl: %d] [la: %d]"}
};
static void logger_poll_watchers(int force_poll);
@@ -106,83 +116,37 @@ static void logger_link_q(logger *l) {
return;
}*/
-/* Centralized log chunk buffer. All worker threads dequeue into this buffer,
- * which gets written out to subscribed watchers.
- * The "latest" global chunk is tracked.
- * New chunks get attached to each active watcher, increasing refcount.
- * Watchers flush from this buffer (tracking w->flushed) until buffer is full.
- * When full, a new buffer is created and chained after the old one (lc->next)
- * Watchers then catch up, releasing ref's on old chunks as they complete
- * flushing.
- * When a chunk is fully flushed by all watchers, it's released back to the
- * central bipbuf.
- * When totally out of chunk space, the most-behind watchers are closed.
- *
- * This fiddling is done to avoid excess memcpy'ing. We write directly into
- * the central bipbuf rather than copy into it, and multiple readers access
- * it.
- */
-static logger_chunk *logger_get_chunk(void) {
- logger_chunk *lc = logger_thread_last_lc;
- int x;
-
- if (lc && lc->filled == 0)
- return lc;
-
- lc = (logger_chunk *) bipbuf_request(logger_thread_buf, (8 * 1024));
- if (lc == NULL) {
- L_DEBUG("LOGGER: Chunk space full.\n");
- return NULL;
- }
- bipbuf_push(logger_thread_buf, (8 * 1024));
- L_DEBUG("LOGGER: Creating new chunk\n");
- memset(lc, 0, sizeof(logger_chunk));
- lc->size = (8 * 1024) - sizeof(logger_chunk);
-
- /* Each watcher gets a reference to this new chunk */
- for (x = 0; x < WATCHER_LIMIT; x++) {
- logger_watcher *w = watchers[x];
- if (w == NULL)
- continue;
-
- if (w->lc == NULL) {
- /* Watcher doesn't already have a chunk. */
- w->lc = lc;
- }
- lc->refcount++;
- w->chunks++;
- }
-
- if (logger_thread_last_lc) {
- assert(logger_thread_last_lc != lc);
- logger_thread_last_lc->next = lc;
- }
- logger_thread_last_lc = lc;
- logger_thread_lc_count++;
-
- return lc;
-}
-
/* return FULLBUF to tell caller buffer is full.
* caller will then flush and retry log parse
*/
-static enum logger_parse_entry_ret logger_parse_entry(logger_chunk *lc, logentry *e) {
+static enum logger_parse_entry_ret logger_parse_entry(bipbuf_t *buf, logentry *e) {
int total = 0;
+ /* "64" needs better definition, since we're extending the length of the
+ * logline a bit.
+ */
+ int esize = sizeof(logentry) + e->size + 64;
+ logentry *newe = (logentry *) bipbuf_request(buf, esize);
+ if (newe == NULL) {
+ return LOGGER_PARSE_ENTRY_FULLBUF;
+ }
+ /* Not bothering copying the whole struct header. will probably come up
+ * with a different shorter struct for this.
+ */
+ newe->watcher_flag = e->watcher_flag;
switch (e->event) {
case LOGGER_TEXT_ENTRY:
case LOGGER_EVICTION_ENTRY:
- total = snprintf(lc->data + lc->written, (lc->size - lc->written), "[%d.%d] [%llu] %s\n",
+ total = snprintf((char *) newe->data, esize, "[%d.%d] [%llu] %s\n",
(int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid, (char *) e->data);
- if (total >= (lc->size - lc->written) || total <= 0) {
- /* ran out of space. don't advance written, run a flush,
- * retry write?
- */
- L_DEBUG("LOGGER: Chunk filled\n");
- lc->filled = 1;
- return LOGGER_PARSE_ENTRY_FULLBUF;
+ if (total >= esize || total <= 0) {
+ /* FIXME: This is now a much more fatal error. need to make it
+ * not crash though. */
+ L_DEBUG("LOGGER: Failed to flatten log entry!\n");
+ return LOGGER_PARSE_ENTRY_FAILED;
} else {
- lc->written += total;
+ newe->size = total + 1;
+ bipbuf_push(buf, sizeof(logentry) + newe->size);
}
break;
}
@@ -190,19 +154,6 @@ static enum logger_parse_entry_ret logger_parse_entry(logger_chunk *lc, logentry
return LOGGER_PARSE_ENTRY_OK;
}
-static void logger_chunk_release(logger_chunk *lc) {
- lc->refcount--;
- if (lc->refcount == 0) {
- L_DEBUG("LOGGER: Releasing logger chunk\n");
- if (lc->next == NULL) {
- L_DEBUG("LOGGER: Clearing last LC chunk reference\n");
- logger_thread_last_lc = NULL;
- }
- bipbuf_poll(logger_thread_buf, (8 * 1024));
- logger_thread_lc_count--;
- }
-}
-
/* Called with logger stack locked.
* Iterates over every watcher collecting enabled flags.
*/
@@ -231,13 +182,7 @@ static void logger_set_flags(void) {
* processing.
*/
static void logger_close_watcher(logger_watcher *w) {
- logger_chunk *lc = w->lc;
L_DEBUG("LOGGER: Closing dead watcher\n");
- while (lc != NULL) {
- logger_chunk *nlc = lc->next;
- logger_chunk_release(lc);
- lc = nlc;
- }
watchers[w->id] = NULL;
sidethread_conn_close(w->c);
watcher_count--;
@@ -245,22 +190,56 @@ static void logger_close_watcher(logger_watcher *w) {
logger_set_flags();
}
+static void logger_push_central_buf(void) {
+ int x;
+ logger_watcher *w;
+ int min_flushed = INT_MAX;
+ /* If something is set into min_flushed, we're able to advance the central
+ * buffer forward by min_flushed bytes.
+ */
+ for (x = 0; x < WATCHER_LIMIT; x++) {
+ if (watchers[x] == NULL)
+ continue;
+ w = watchers[x];
+ if (w->min_flushed < min_flushed)
+ min_flushed = w->min_flushed;
+ }
+ if (min_flushed != 0) {
+ //L_DEBUG("LOGGER: min_flushed [%d], advancing central buffer\n", min_flushed);
+ for (x = 0; x < WATCHER_LIMIT; x++) {
+ if (watchers[x] == NULL)
+ continue;
+ assert(watchers[x]->flushed - min_flushed >= 0);
+ watchers[x]->flushed -= min_flushed;
+ watchers[x]->min_flushed -= min_flushed;
+ }
+ bipbuf_poll(logger_thread_buf, min_flushed);
+ }
+}
+
/* Pick any number of "oldest" behind-watchers and kill them. */
-static void logger_kill_watchers(const int count) {
+static void logger_kill_watchers(void) {
int x;
logger_watcher *w;
+ int min_flushed = INT_MAX;
+ int min_flushed_watcher = -1;
for (x = 0; x < WATCHER_LIMIT; x++) {
w = watchers[x];
if (w == NULL)
continue;
- if (w->lc && w->chunks == count) {
- L_DEBUG("LOGGER: Killing watcher [%d] because it's too behind (%d)\n",
- w->sfd, w->chunks);
- logger_close_watcher(w);
+ if (w->min_flushed < min_flushed) {
+ min_flushed = w->min_flushed;
+ min_flushed_watcher = x;
}
}
+ if (min_flushed_watcher > -1) {
+ fprintf(stderr, "LOGGER: Killing watcher [%d] because of low flush bytes (%d)\n",
+ watchers[min_flushed_watcher]->sfd, min_flushed);
+ logger_close_watcher(watchers[min_flushed_watcher]);
+ logger_push_central_buf();
+ }
}
/* Reads a particular worker thread's available bipbuf bytes. Parses each log
@@ -272,8 +251,8 @@ static int logger_thread_read(logger *l) {
unsigned int size;
unsigned int pos = 0;
unsigned char *data;
+ unsigned int was_full = 0;
logentry *e;
- logger_chunk *lc;
pthread_mutex_lock(&l->mutex);
data = bipbuf_peek_all(l->buf, &size);
pthread_mutex_unlock(&l->mutex);
@@ -281,27 +260,28 @@ static int logger_thread_read(logger *l) {
if (data == NULL) {
return 0;
}
- L_DEBUG("LOGGER: Got %d bytes from bipbuffer\n", size);
+ //L_DEBUG("LOGGER: Got %d bytes from bipbuffer\n", size);
/* parse buffer */
while (pos < size && watcher_count > 0) {
enum logger_parse_entry_ret ret;
e = (logentry *) (data + pos);
- lc = logger_get_chunk();
- if (lc == NULL) {
- /* out of buffer space. show must go on, so kill something. */
- logger_kill_watchers(logger_thread_lc_count);
- continue;
- }
- ret = logger_parse_entry(lc, e);
+ ret = logger_parse_entry(logger_thread_buf, e);
if (ret == LOGGER_PARSE_ENTRY_FULLBUF) {
/* Buffer was full. Push the last up, force an early flush. */
- logger_poll_watchers(0);
+ if (was_full) {
+ /* out of buffer space. show must go on, so kill something. */
+ logger_kill_watchers();
+ } else {
+ logger_poll_watchers(0);
+ was_full = 1;
+ }
continue;
} else if (ret != LOGGER_PARSE_ENTRY_OK) {
fprintf(stderr, "LOGGER: Failed to parse log entry\n");
abort();
}
+ was_full = 0;
pos += sizeof(logentry) + e->size;
}
assert(pos <= size);
@@ -342,24 +322,20 @@ static void logger_poll_watchers(int force_poll) {
int x;
int nfd = 0;
logger_watcher *w;
+ unsigned char *data;
+ unsigned int data_size = 0;
+ struct iovec iov[IOV_MAX];
+ int iovcnt = 0;
+ data = bipbuf_peek_all(logger_thread_buf, &data_size);
+ if (data == NULL && force_poll == 0)
+ return;
for (x = 0; x < WATCHER_LIMIT; x++) {
w = watchers[x];
if (w == NULL)
continue;
- /* If this chunk has been flushed, release it and start the next one.
- * Next chunk may be null, which stops writing. */
- if (w->lc != NULL && w->flushed >= w->lc->written && w->lc->filled) {
- logger_chunk *nlc = w->lc->next;
- logger_chunk_release(w->lc);
- /* More buffers to eat */
- w->lc = nlc;
- w->chunks--;
- w->flushed = 0;
- }
-
- if (w->lc != NULL && w->lc->written > w->flushed) {
+ if (data_size > w->flushed) {
watchers_pollfds[nfd].fd = w->sfd;
watchers_pollfds[nfd].events = POLLOUT;
nfd++;
@@ -370,10 +346,14 @@ static void logger_poll_watchers(int force_poll) {
}
}
+ /* FIXME: Verify if this is necessary. */
+ if (data != NULL && force_poll == 0)
+ assert(nfd != 0);
+
if (nfd == 0)
return;
- L_DEBUG("LOGGER: calling poll()\n");
+ L_DEBUG("LOGGER: calling poll() [data_size: %d]\n", data_size);
int ret = poll(watchers_pollfds, nfd, 0);
if (ret < 0) {
@@ -386,7 +366,6 @@ static void logger_poll_watchers(int force_poll) {
w = watchers[x];
if (w == NULL)
continue;
-
/* Early detection of a disconnect. Otherwise we have to wait until
* the next write
*/
@@ -399,40 +378,96 @@ static void logger_poll_watchers(int force_poll) {
continue;
}
}
- if (w->lc != NULL) {
+ if (data_size > w->flushed) {
if (watchers_pollfds[nfd].revents & (POLLHUP|POLLERR)) {
L_DEBUG("LOGGER: watcher closed during poll() call\n");
logger_close_watcher(w);
} else if (watchers_pollfds[nfd].revents & POLLOUT) {
int total = 0;
+ /* To account for a partial write into the iovec, pos should
+ * loop and skip logentry's until the next one would be
+ * larger, then set the iov_base as an offset into the
+ * remainder.
+ */
+ unsigned int pos = w->flushed;
+ iovcnt = 0;
+ while (pos < data_size) {
+ logentry *e = (logentry *) (data + pos);
+ int esize = sizeof(logentry) + e->size;
+ int flushed = 0;
+ if (pos + esize < w->flushed) {
+ pos += esize;
+ continue;
+ } else if (pos < w->flushed) {
+ /* We have a remainder. */
+ flushed = w->flushed - (pos + sizeof(logentry));
+ assert(pos + flushed <= w->flushed);
+ } else if ((e->watcher_flag & w->eflags) == 0) {
+ /* If we're ahead of flushed but we aren't listening,
+ * we advance flushed and skip this event
+ */
+ //L_DEBUG("LOGGER: Skipped an event for [%d] (eflags: %d) (watcher_flag: %d)\n",
+ // w->sfd, w->eflags, e->watcher_flag);
+ pos += esize;
+ w->flushed += esize;
+ w->min_flushed += esize;
+ continue;
+ }
+ iov[iovcnt].iov_base = (e->data + flushed);
+ iov[iovcnt].iov_len = e->size - flushed;
+ //L_DEBUG("LOGGER: incrementing pos [%d] by %d [esize: %d]\n", pos, esize, e->size);
+ pos += esize;
+ iovcnt++;
+ }
+
+ if (iovcnt == 0) {
+ /* Turns out we skipped all of the events! */
+ continue;
+ }
+
/* We can write a bit. */
switch (w->t) {
case LOGGER_WATCHER_STDERR:
- total = fwrite(w->lc->data + w->flushed, 1, w->lc->written - w->flushed, stderr);
+ total = writev(STDERR_FILENO, iov, iovcnt);
break;
case LOGGER_WATCHER_CLIENT:
- total = write(w->sfd, w->lc->data + w->flushed, w->lc->written - w->flushed);
+ total = writev(w->sfd, iov, iovcnt);
break;
}
- L_DEBUG("LOGGER: poll() wrote %d to %d (written: %d) (flushed: %d)\n", total, w->sfd,
- w->lc->written, w->flushed);
+ L_DEBUG("LOGGER: poll() wrote %d to %d (data_size: %d) (flushed: %d)\n", total, w->sfd,
+ data_size, w->flushed);
if (total == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
logger_close_watcher(w);
}
L_DEBUG("LOGGER: watcher hit EAGAIN\n");
- nfd++;
- continue;
} else if (total == 0) {
logger_close_watcher(w);
+ } else {
+ int rem = total;
+ int x;
+ for (x = 0; x < iovcnt; x++) {
+ rem -= iov[x].iov_len;
+ if (rem >= 0) {
+ w->flushed += iov[x].iov_len + sizeof(logentry);
+ } else {
+ L_DEBUG("LOGGER: remainder %d\n", rem);
+ /* We have a remainder. do count logentry
+ * struct at this time. */
+ rem = abs(rem) + sizeof(logentry);
+ w->flushed += rem;
+ break;
+ }
+ }
+ w->min_flushed = w->flushed - rem;
}
-
- w->flushed += total;
}
nfd++;
}
}
+
+ logger_push_central_buf();
}
#define MAX_LOGGER_SLEEP 100000
@@ -571,6 +606,7 @@ enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, cons
}
e->gid = gid;
e->event = d->subtype;
+ e->watcher_flag = d->watcher_flag;
SET_LOGGER_TIME();
switch (d->subtype) {
@@ -622,6 +658,7 @@ enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, cons
enum logger_add_watcher_ret logger_add_watcher(void *c, const int sfd, uint16_t f) {
int x;
logger_watcher *w = NULL;
+ unsigned int size = 0;
pthread_mutex_lock(&logger_stack_lock);
if (watcher_count >= WATCHER_LIMIT) {
return LOGGER_ADD_WATCHER_TOO_MANY;
@@ -644,13 +681,10 @@ enum logger_add_watcher_ret logger_add_watcher(void *c, const int sfd, uint16_t
}
w->id = x;
w->eflags = f;
- /* Attach to an existing log chunk if there is one */
- if (logger_thread_last_lc && !logger_thread_last_lc->filled) {
- logger_chunk *lc = logger_thread_last_lc;
- lc->refcount++;
- w->lc = lc;
- w->flushed = lc->written; /* only write logs after we attach */
- w->chunks++;
+ /* Skip any currently queued data, so we only print new lines. */
+ if (bipbuf_peek_all(logger_thread_buf, &size) != NULL) {
+ w->flushed = size;
+ w->min_flushed = size;
}
watchers[x] = w;
watcher_count++;