summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2016-06-11 21:15:45 -0700
committerdormando <dormando@rydia.net>2016-06-11 21:15:45 -0700
commit9d9da4a2ea8fc92399c42390e9a679d2ea634c90 (patch)
treeb7f6fd08315b79a3c84406a87a5b0044972e557f
parent4b7228ed75aafc7428806f92bc3399f16b928a90 (diff)
downloadmemcached-logging_next.tar.gz
remove "logger chunks", add individualized streamslogging_next
Stop doing a multi-reader circular buffer structure on top of a circular buffer. Also adds individualized streams based off of the central buffer. Sadly this requires managing iovecs and dealing with partial writes into said iovecs. That makes things very complicated. Since it's not clear to me how to simplify it too much (as of this writing), one of the next commits should remove iovecs and instead give each watcher its own circular buffer. The parser thread will copy watched events into each buffer. The above would only be slower for the case of many watchers watching the same event streams. Given all of the extra loops required for managing the iovecs, and the more complicated syscall handling, it might even be the same speed to manage multiple buffers anyway. I completed this intermediary change since it simplifies the surrounding code and was educational to fiddle with iovecs again.
-rw-r--r--logger.c306
-rw-r--r--logger.h17
-rw-r--r--memcached.c8
3 files changed, 180 insertions, 151 deletions
diff --git a/logger.c b/logger.c
index 49f2383..3180e30 100644
--- a/logger.c
+++ b/logger.c
@@ -1,14 +1,26 @@
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+/* need this to get IOV_MAX on some platforms. */
+#ifndef __need_IOV_MAX
+#define __need_IOV_MAX
+#endif
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <poll.h>
+#include <limits.h>
#include "memcached.h"
#include "bipbuffer.h"
-#define LOGGER_DEBUG 0
+/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
+#ifndef IOV_MAX
+#if defined(__FreeBSD__) || defined(__APPLE__) || defined(__GNU__)
+# define IOV_MAX 1024
+#endif
+#endif
+
+#define LOGGER_DEBUG 1
#if LOGGER_DEBUG
#define L_DEBUG(...) \
@@ -31,8 +43,6 @@ pthread_mutex_t logger_stack_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_key_t logger_key;
bipbuf_t *logger_thread_buf = NULL;
-logger_chunk *logger_thread_last_lc = NULL;
-int logger_thread_lc_count = 0;
#if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
pthread_mutex_t logger_atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -45,8 +55,8 @@ int watcher_count = 0;
/* Should this go somewhere else? */
static const entry_details default_entries[] = {
- [LOGGER_ASCII_CMD] = {LOGGER_TEXT_ENTRY, 512, "<%d %s"},
- [LOGGER_EVICTION] = {LOGGER_EVICTION_ENTRY, 512, "eviction: [key %s] [fetched: %s] [ttl: %d] [la: %d]"}
+ [LOGGER_ASCII_CMD] = {LOGGER_TEXT_ENTRY, 512, LOG_RAWCMDS, "<%d %s"},
+ [LOGGER_EVICTION] = {LOGGER_EVICTION_ENTRY, 512, LOG_EVICTIONS, "eviction: [key %s] [fetched: %s] [ttl: %d] [la: %d]"}
};
static void logger_poll_watchers(int force_poll);
@@ -106,83 +116,37 @@ static void logger_link_q(logger *l) {
return;
}*/
-/* Centralized log chunk buffer. All worker threads dequeue into this buffer,
- * which gets written out to subscribed watchers.
- * The "latest" global chunk is tracked.
- * New chunks get attached to each active watcher, increasing refcount.
- * Watchers flush from this buffer (tracking w->flushed) until buffer is full.
- * When full, a new buffer is created and chained after the old one (lc->next)
- * Watchers then catch up, releasing ref's on old chunks as they complete
- * flushing.
- * When a chunk is fully flushed by all watchers, it's released back to the
- * central bipbuf.
- * When totally out of chunk space, the most-behind watchers are closed.
- *
- * This fiddling is done to avoid excess memcpy'ing. We write directly into
- * the central bipbuf rather than copy into it, and multiple readers access
- * it.
- */
-static logger_chunk *logger_get_chunk(void) {
- logger_chunk *lc = logger_thread_last_lc;
- int x;
-
- if (lc && lc->filled == 0)
- return lc;
-
- lc = (logger_chunk *) bipbuf_request(logger_thread_buf, (8 * 1024));
- if (lc == NULL) {
- L_DEBUG("LOGGER: Chunk space full.\n");
- return NULL;
- }
- bipbuf_push(logger_thread_buf, (8 * 1024));
- L_DEBUG("LOGGER: Creating new chunk\n");
- memset(lc, 0, sizeof(logger_chunk));
- lc->size = (8 * 1024) - sizeof(logger_chunk);
-
- /* Each watcher gets a reference to this new chunk */
- for (x = 0; x < WATCHER_LIMIT; x++) {
- logger_watcher *w = watchers[x];
- if (w == NULL)
- continue;
-
- if (w->lc == NULL) {
- /* Watcher doesn't already have a chunk. */
- w->lc = lc;
- }
- lc->refcount++;
- w->chunks++;
- }
-
- if (logger_thread_last_lc) {
- assert(logger_thread_last_lc != lc);
- logger_thread_last_lc->next = lc;
- }
- logger_thread_last_lc = lc;
- logger_thread_lc_count++;
-
- return lc;
-}
-
/* return FULLBUF to tell caller buffer is full.
* caller will then flush and retry log parse
*/
-static enum logger_parse_entry_ret logger_parse_entry(logger_chunk *lc, logentry *e) {
+static enum logger_parse_entry_ret logger_parse_entry(bipbuf_t *buf, logentry *e) {
int total = 0;
+ /* "64" needs better definition, since we're extending the length of the
+ * logline a bit.
+ */
+ int esize = sizeof(logentry) + e->size + 64;
+ logentry *newe = (logentry *) bipbuf_request(buf, esize);
+ if (newe == NULL) {
+ return LOGGER_PARSE_ENTRY_FULLBUF;
+ }
+ /* Not bothering copying the whole struct header. will probably come up
+ * with a different shorter struct for this.
+ */
+ newe->watcher_flag = e->watcher_flag;
switch (e->event) {
case LOGGER_TEXT_ENTRY:
case LOGGER_EVICTION_ENTRY:
- total = snprintf(lc->data + lc->written, (lc->size - lc->written), "[%d.%d] [%llu] %s\n",
+ total = snprintf((char *) newe->data, esize, "[%d.%d] [%llu] %s\n",
(int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid, (char *) e->data);
- if (total >= (lc->size - lc->written) || total <= 0) {
- /* ran out of space. don't advance written, run a flush,
- * retry write?
- */
- L_DEBUG("LOGGER: Chunk filled\n");
- lc->filled = 1;
- return LOGGER_PARSE_ENTRY_FULLBUF;
+ if (total >= esize || total <= 0) {
+ /* FIXME: This is now a much more fatal error. need to make it
+ * not crash though. */
+ L_DEBUG("LOGGER: Failed to flatten log entry!\n");
+ return LOGGER_PARSE_ENTRY_FAILED;
} else {
- lc->written += total;
+ newe->size = total + 1;
+ bipbuf_push(buf, sizeof(logentry) + newe->size);
}
break;
}
@@ -190,19 +154,6 @@ static enum logger_parse_entry_ret logger_parse_entry(logger_chunk *lc, logentry
return LOGGER_PARSE_ENTRY_OK;
}
-static void logger_chunk_release(logger_chunk *lc) {
- lc->refcount--;
- if (lc->refcount == 0) {
- L_DEBUG("LOGGER: Releasing logger chunk\n");
- if (lc->next == NULL) {
- L_DEBUG("LOGGER: Clearing last LC chunk reference\n");
- logger_thread_last_lc = NULL;
- }
- bipbuf_poll(logger_thread_buf, (8 * 1024));
- logger_thread_lc_count--;
- }
-}
-
/* Called with logger stack locked.
* Iterates over every watcher collecting enabled flags.
*/
@@ -231,13 +182,7 @@ static void logger_set_flags(void) {
* processing.
*/
static void logger_close_watcher(logger_watcher *w) {
- logger_chunk *lc = w->lc;
L_DEBUG("LOGGER: Closing dead watcher\n");
- while (lc != NULL) {
- logger_chunk *nlc = lc->next;
- logger_chunk_release(lc);
- lc = nlc;
- }
watchers[w->id] = NULL;
sidethread_conn_close(w->c);
watcher_count--;
@@ -245,22 +190,56 @@ static void logger_close_watcher(logger_watcher *w) {
logger_set_flags();
}
+static void logger_push_central_buf(void) {
+ int x;
+ logger_watcher *w;
+ int min_flushed = INT_MAX;
+ /* If something is set into min_flushed, we're able to advance the central
+ * buffer forward by min_flushed bytes.
+ */
+ for (x = 0; x < WATCHER_LIMIT; x++) {
+ if (watchers[x] == NULL)
+ continue;
+ w = watchers[x];
+ if (w->min_flushed < min_flushed)
+ min_flushed = w->min_flushed;
+ }
+ if (min_flushed != 0) {
+ //L_DEBUG("LOGGER: min_flushed [%d], advancing central buffer\n", min_flushed);
+ for (x = 0; x < WATCHER_LIMIT; x++) {
+ if (watchers[x] == NULL)
+ continue;
+ assert(watchers[x]->flushed - min_flushed >= 0);
+ watchers[x]->flushed -= min_flushed;
+ watchers[x]->min_flushed -= min_flushed;
+ }
+ bipbuf_poll(logger_thread_buf, min_flushed);
+ }
+}
+
/* Pick any number of "oldest" behind-watchers and kill them. */
-static void logger_kill_watchers(const int count) {
+static void logger_kill_watchers(void) {
int x;
logger_watcher *w;
+ int min_flushed = INT_MAX;
+ int min_flushed_watcher = -1;
for (x = 0; x < WATCHER_LIMIT; x++) {
w = watchers[x];
if (w == NULL)
continue;
- if (w->lc && w->chunks == count) {
- L_DEBUG("LOGGER: Killing watcher [%d] because it's too behind (%d)\n",
- w->sfd, w->chunks);
- logger_close_watcher(w);
+ if (w->min_flushed < min_flushed) {
+ min_flushed = w->min_flushed;
+ min_flushed_watcher = x;
}
}
+ if (min_flushed_watcher > -1) {
+ fprintf(stderr, "LOGGER: Killing watcher [%d] because of low flush bytes (%d)\n",
+ watchers[min_flushed_watcher]->sfd, min_flushed);
+ logger_close_watcher(watchers[min_flushed_watcher]);
+ logger_push_central_buf();
+ }
}
/* Reads a particular worker thread's available bipbuf bytes. Parses each log
@@ -272,8 +251,8 @@ static int logger_thread_read(logger *l) {
unsigned int size;
unsigned int pos = 0;
unsigned char *data;
+ unsigned int was_full = 0;
logentry *e;
- logger_chunk *lc;
pthread_mutex_lock(&l->mutex);
data = bipbuf_peek_all(l->buf, &size);
pthread_mutex_unlock(&l->mutex);
@@ -281,27 +260,28 @@ static int logger_thread_read(logger *l) {
if (data == NULL) {
return 0;
}
- L_DEBUG("LOGGER: Got %d bytes from bipbuffer\n", size);
+ //L_DEBUG("LOGGER: Got %d bytes from bipbuffer\n", size);
/* parse buffer */
while (pos < size && watcher_count > 0) {
enum logger_parse_entry_ret ret;
e = (logentry *) (data + pos);
- lc = logger_get_chunk();
- if (lc == NULL) {
- /* out of buffer space. show must go on, so kill something. */
- logger_kill_watchers(logger_thread_lc_count);
- continue;
- }
- ret = logger_parse_entry(lc, e);
+ ret = logger_parse_entry(logger_thread_buf, e);
if (ret == LOGGER_PARSE_ENTRY_FULLBUF) {
/* Buffer was full. Push the last up, force an early flush. */
- logger_poll_watchers(0);
+ if (was_full) {
+ /* out of buffer space. show must go on, so kill something. */
+ logger_kill_watchers();
+ } else {
+ logger_poll_watchers(0);
+ was_full = 1;
+ }
continue;
} else if (ret != LOGGER_PARSE_ENTRY_OK) {
fprintf(stderr, "LOGGER: Failed to parse log entry\n");
abort();
}
+ was_full = 0;
pos += sizeof(logentry) + e->size;
}
assert(pos <= size);
@@ -342,24 +322,20 @@ static void logger_poll_watchers(int force_poll) {
int x;
int nfd = 0;
logger_watcher *w;
+ unsigned char *data;
+ unsigned int data_size = 0;
+ struct iovec iov[IOV_MAX];
+ int iovcnt = 0;
+ data = bipbuf_peek_all(logger_thread_buf, &data_size);
+ if (data == NULL && force_poll == 0)
+ return;
for (x = 0; x < WATCHER_LIMIT; x++) {
w = watchers[x];
if (w == NULL)
continue;
- /* If this chunk has been flushed, release it and start the next one.
- * Next chunk may be null, which stops writing. */
- if (w->lc != NULL && w->flushed >= w->lc->written && w->lc->filled) {
- logger_chunk *nlc = w->lc->next;
- logger_chunk_release(w->lc);
- /* More buffers to eat */
- w->lc = nlc;
- w->chunks--;
- w->flushed = 0;
- }
-
- if (w->lc != NULL && w->lc->written > w->flushed) {
+ if (data_size > w->flushed) {
watchers_pollfds[nfd].fd = w->sfd;
watchers_pollfds[nfd].events = POLLOUT;
nfd++;
@@ -370,10 +346,14 @@ static void logger_poll_watchers(int force_poll) {
}
}
+ /* FIXME: Verify if this is necessary. */
+ if (data != NULL && force_poll == 0)
+ assert(nfd != 0);
+
if (nfd == 0)
return;
- L_DEBUG("LOGGER: calling poll()\n");
+ L_DEBUG("LOGGER: calling poll() [data_size: %d]\n", data_size);
int ret = poll(watchers_pollfds, nfd, 0);
if (ret < 0) {
@@ -386,7 +366,6 @@ static void logger_poll_watchers(int force_poll) {
w = watchers[x];
if (w == NULL)
continue;
-
/* Early detection of a disconnect. Otherwise we have to wait until
* the next write
*/
@@ -399,40 +378,96 @@ static void logger_poll_watchers(int force_poll) {
continue;
}
}
- if (w->lc != NULL) {
+ if (data_size > w->flushed) {
if (watchers_pollfds[nfd].revents & (POLLHUP|POLLERR)) {
L_DEBUG("LOGGER: watcher closed during poll() call\n");
logger_close_watcher(w);
} else if (watchers_pollfds[nfd].revents & POLLOUT) {
int total = 0;
+ /* To account for a partial write into the iovec, pos should
+ * loop and skip logentry's until the next one would be
+ * larger, then set the iov_base as an offset into the
+ * remainder.
+ */
+ unsigned int pos = w->flushed;
+ iovcnt = 0;
+ while (pos < data_size) {
+ logentry *e = (logentry *) (data + pos);
+ int esize = sizeof(logentry) + e->size;
+ int flushed = 0;
+ if (pos + esize < w->flushed) {
+ pos += esize;
+ continue;
+ } else if (pos < w->flushed) {
+ /* We have a remainder. */
+ flushed = w->flushed - (pos + sizeof(logentry));
+ assert(pos + flushed <= w->flushed);
+ } else if ((e->watcher_flag & w->eflags) == 0) {
+ /* If we're ahead of flushed but we aren't listening,
+ * we advance flushed and skip this event
+ */
+ //L_DEBUG("LOGGER: Skipped an event for [%d] (eflags: %d) (watcher_flag: %d)\n",
+ // w->sfd, w->eflags, e->watcher_flag);
+ pos += esize;
+ w->flushed += esize;
+ w->min_flushed += esize;
+ continue;
+ }
+ iov[iovcnt].iov_base = (e->data + flushed);
+ iov[iovcnt].iov_len = e->size - flushed;
+ //L_DEBUG("LOGGER: incrementing pos [%d] by %d [esize: %d]\n", pos, esize, e->size);
+ pos += esize;
+ iovcnt++;
+ }
+
+ if (iovcnt == 0) {
+ /* Turns out we skipped all of the events! */
+ continue;
+ }
+
/* We can write a bit. */
switch (w->t) {
case LOGGER_WATCHER_STDERR:
- total = fwrite(w->lc->data + w->flushed, 1, w->lc->written - w->flushed, stderr);
+ total = writev(STDERR_FILENO, iov, iovcnt);
break;
case LOGGER_WATCHER_CLIENT:
- total = write(w->sfd, w->lc->data + w->flushed, w->lc->written - w->flushed);
+ total = writev(w->sfd, iov, iovcnt);
break;
}
- L_DEBUG("LOGGER: poll() wrote %d to %d (written: %d) (flushed: %d)\n", total, w->sfd,
- w->lc->written, w->flushed);
+ L_DEBUG("LOGGER: poll() wrote %d to %d (data_size: %d) (flushed: %d)\n", total, w->sfd,
+ data_size, w->flushed);
if (total == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
logger_close_watcher(w);
}
L_DEBUG("LOGGER: watcher hit EAGAIN\n");
- nfd++;
- continue;
} else if (total == 0) {
logger_close_watcher(w);
+ } else {
+ int rem = total;
+ int x;
+ for (x = 0; x < iovcnt; x++) {
+ rem -= iov[x].iov_len;
+ if (rem >= 0) {
+ w->flushed += iov[x].iov_len + sizeof(logentry);
+ } else {
+ L_DEBUG("LOGGER: remainder %d\n", rem);
+ /* We have a remainder. do count logentry
+ * struct at this time. */
+ rem = abs(rem) + sizeof(logentry);
+ w->flushed += rem;
+ break;
+ }
+ }
+ w->min_flushed = w->flushed - rem;
}
-
- w->flushed += total;
}
nfd++;
}
}
+
+ logger_push_central_buf();
}
#define MAX_LOGGER_SLEEP 100000
@@ -571,6 +606,7 @@ enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, cons
}
e->gid = gid;
e->event = d->subtype;
+ e->watcher_flag = d->watcher_flag;
SET_LOGGER_TIME();
switch (d->subtype) {
@@ -622,6 +658,7 @@ enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, cons
enum logger_add_watcher_ret logger_add_watcher(void *c, const int sfd, uint16_t f) {
int x;
logger_watcher *w = NULL;
+ unsigned int size = 0;
pthread_mutex_lock(&logger_stack_lock);
if (watcher_count >= WATCHER_LIMIT) {
return LOGGER_ADD_WATCHER_TOO_MANY;
@@ -644,13 +681,10 @@ enum logger_add_watcher_ret logger_add_watcher(void *c, const int sfd, uint16_t
}
w->id = x;
w->eflags = f;
- /* Attach to an existing log chunk if there is one */
- if (logger_thread_last_lc && !logger_thread_last_lc->filled) {
- logger_chunk *lc = logger_thread_last_lc;
- lc->refcount++;
- w->lc = lc;
- w->flushed = lc->written; /* only write logs after we attach */
- w->chunks++;
+ /* Skip any currently queued data, so we only print new lines. */
+ if (bipbuf_peek_all(logger_thread_buf, &size) != NULL) {
+ w->flushed = size;
+ w->min_flushed = size;
}
watchers[x] = w;
watcher_count++;
diff --git a/logger.h b/logger.h
index 31a246d..6cad6ca 100644
--- a/logger.h
+++ b/logger.h
@@ -28,17 +28,20 @@ enum logger_ret_type {
enum logger_parse_entry_ret {
LOGGER_PARSE_ENTRY_OK = 0,
- LOGGER_PARSE_ENTRY_FULLBUF
+ LOGGER_PARSE_ENTRY_FULLBUF,
+ LOGGER_PARSE_ENTRY_FAILED
};
typedef const struct {
enum log_entry_subtype subtype;
int reqlen;
+ uint16_t watcher_flag;
char *format;
} entry_details;
typedef struct _logentry {
enum log_entry_subtype event;
+ uint16_t watcher_flag;
uint64_t gid;
struct timeval tv; /* not monotonic! */
int size;
@@ -56,6 +59,7 @@ typedef struct _logentry {
#define LOG_EVICTIONS (1<<6) /* defailts of evicted items */
#define LOG_STRICT (1<<7) /* block worker instead of drop */
#define LOG_TIME (1<<8) /* log the entry time */
+#define LOG_RAWCMDS (1<<9) /* raw ascii commands */
typedef struct _logger {
struct _logger *prev;
@@ -71,15 +75,6 @@ typedef struct _logger {
const entry_details *entry_map;
} logger;
-typedef struct _logger_chunk {
- struct _logger_chunk *next;
- int size; /* max potential size */
- int written; /* amount written into the buffer (actual size) */
- int refcount; /* number of attached watchers */
- unsigned int filled :1; /* reached storage max */
- char data[];
-} logger_chunk;
-
enum logger_watcher_type {
LOGGER_WATCHER_STDERR = 0,
LOGGER_WATCHER_CLIENT = 1
@@ -87,10 +82,10 @@ enum logger_watcher_type {
typedef struct {
void *c; /* original connection structure. still with source thread attached */
- logger_chunk *lc;
int chunks; /* count of chunks stored up */
int sfd; /* client fd */
int flushed; /* backlog data flushed so far from active chunk */
+ int min_flushed; /* it's safe to flush the central buffer up to here */
int id; /* id number for watcher list */
enum logger_watcher_type t; /* stderr, client, syslog, etc */
uint16_t eflags; /* flags we are interested in */
diff --git a/memcached.c b/memcached.c
index a07d356..76dde89 100644
--- a/memcached.c
+++ b/memcached.c
@@ -3445,7 +3445,7 @@ static void process_command(conn *c, char *command) {
MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
- if (c->thread->l->eflags & LOG_FETCHERS)
+ if (c->thread->l->eflags & LOG_RAWCMDS)
logger_log(c->thread->l, LOGGER_ASCII_CMD, NULL, c->sfd, command);
/*
@@ -3681,15 +3681,15 @@ static void process_command(conn *c, char *command) {
/* TODO: pass to function for full argument processing. */
/* This is very temporary... need to decide on a real flag parser. */
if (ntokens == 3) {
- if ((strcmp(tokens[COMMAND_TOKEN + 1].value, "fetchers") == 0)) {
- f |= LOG_FETCHERS;
+ if ((strcmp(tokens[COMMAND_TOKEN + 1].value, "rawcmds") == 0)) {
+ f |= LOG_RAWCMDS;
} else if ((strcmp(tokens[COMMAND_TOKEN + 1].value, "evictions") == 0)) {
f |= LOG_EVICTIONS;
} else {
out_string(c, "ERROR");
}
} else {
- f |= LOG_FETCHERS;
+ f |= LOG_RAWCMDS;
}
f |= LOG_TIME; /* not optional yet */
switch(logger_add_watcher(c, c->sfd, f)) {