summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2015-11-18 01:49:53 -0800
committerdormando <dormando@rydia.net>2016-06-08 10:13:18 -0700
commit1af3c22bba5abfd38b49840c63237a9a59cbbc4e (patch)
tree710934c11cc8a94d26292804d96c5e3cf43400c3
parent9517c656723a769da63c29e23755b469de4417d2 (diff)
downloadmemcached-1af3c22bba5abfd38b49840c63237a9a59cbbc4e.tar.gz
initial logger code.
Logs are written to per-thread buffers. A new background thread aggregates the logs, further processes them, then writes them to any "watchers". Logs can have the time added to them, and all have a GID so they can be put back into strict order. This is an early preview. Code needs refactoring and a more complete set of options. All watchers are also stuck viewing the global feed of logs, even if they asked for different data. As of this commit there's no way to toggle the "stderr" watcher.
-rw-r--r--LICENSE.bipbuffer24
-rw-r--r--Makefile.am6
-rw-r--r--bipbuffer.c182
-rw-r--r--bipbuffer.h87
-rw-r--r--items.c4
-rw-r--r--logger.c637
-rw-r--r--logger.h114
-rw-r--r--memcached.c26
-rw-r--r--memcached.h5
-rw-r--r--thread.c46
10 files changed, 1126 insertions, 5 deletions
diff --git a/LICENSE.bipbuffer b/LICENSE.bipbuffer
new file mode 100644
index 0000000..d5bddd2
--- /dev/null
+++ b/LICENSE.bipbuffer
@@ -0,0 +1,24 @@
+Copyright (c) 2011, Willem-Hendrik Thiart
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+ * The names of its contributors may not be used to endorse or promote
+ products derived from this software without specific prior written
+ permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL WILLEM-HENDRIK THIART BE LIABLE FOR ANY
+DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/Makefile.am b/Makefile.am
index ba39e60..281e9af 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -18,7 +18,9 @@ memcached_SOURCES = memcached.c memcached.h \
thread.c daemon.c \
stats.c stats.h \
util.c util.h \
- trace.h cache.h sasl_defs.h
+ trace.h cache.h sasl_defs.h \
+ bipbuffer.c bipbuffer.h \
+ logger.c logger.h
if BUILD_CACHE
memcached_SOURCES += cache.c
@@ -71,7 +73,7 @@ memcached_debug_dtrace.o: $(memcached_debug_OBJECTS)
SUBDIRS = doc
DIST_DIRS = scripts
-EXTRA_DIST = doc scripts t memcached.spec memcached_dtrace.d version.m4 README.md
+EXTRA_DIST = doc scripts t memcached.spec memcached_dtrace.d version.m4 README.md LICENSE.bipbuffer
MOSTLYCLEANFILES = *.gcov *.gcno *.gcda *.tcov
diff --git a/bipbuffer.c b/bipbuffer.c
new file mode 100644
index 0000000..ee48c84
--- /dev/null
+++ b/bipbuffer.c
@@ -0,0 +1,182 @@
+/**
+ * Copyright (c) 2011, Willem-Hendrik Thiart
+ * Use of this source code is governed by a BSD-style license that can be
+ * found in the LICENSE.bipbuffer file.
+ *
+ * @file
+ * @author Willem Thiart himself@willemthiart.com
+ */
+
+#include "stdio.h"
+#include <stdlib.h>
+
+/* for memcpy */
+#include <string.h>
+
+#include "bipbuffer.h"
+
+static size_t bipbuf_sizeof(const unsigned int size)
+{
+ return sizeof(bipbuf_t) + size;
+}
+
+int bipbuf_unused(const bipbuf_t* me)
+{
+ if (1 == me->b_inuse)
+ /* distance between region B and region A */
+ return me->a_start - me->b_end;
+ else
+ return me->size - me->a_end;
+}
+
+int bipbuf_size(const bipbuf_t* me)
+{
+ return me->size;
+}
+
+int bipbuf_used(const bipbuf_t* me)
+{
+ return (me->a_end - me->a_start) + me->b_end;
+}
+
+void bipbuf_init(bipbuf_t* me, const unsigned int size)
+{
+ me->a_start = me->a_end = me->b_end = 0;
+ me->size = size;
+ me->b_inuse = 0;
+}
+
+bipbuf_t *bipbuf_new(const unsigned int size)
+{
+ bipbuf_t *me = malloc(bipbuf_sizeof(size));
+ if (!me)
+ return NULL;
+ bipbuf_init(me, size);
+ return me;
+}
+
+void bipbuf_free(bipbuf_t* me)
+{
+ free(me);
+}
+
+int bipbuf_is_empty(const bipbuf_t* me)
+{
+ return me->a_start == me->a_end;
+}
+
+/* find out if we should turn on region B
+ * ie. is the distance from A to buffer's end less than B to A? */
+static void __check_for_switch_to_b(bipbuf_t* me)
+{
+ if (me->size - me->a_end < me->a_start - me->b_end)
+ me->b_inuse = 1;
+}
+
+/* TODO: DOCUMENT THESE TWO FUNCTIONS */
+unsigned char *bipbuf_request(bipbuf_t* me, const int size)
+{
+ if (bipbuf_unused(me) < size)
+ return 0;
+ if (1 == me->b_inuse)
+ {
+ return (unsigned char *)me->data + me->b_end;
+ }
+ else
+ {
+ return (unsigned char *)me->data + me->a_end;
+ }
+
+ return NULL;
+}
+
+int bipbuf_push(bipbuf_t* me, const int size)
+{
+ if (bipbuf_unused(me) < size)
+ return 0;
+
+ if (1 == me->b_inuse)
+ {
+ me->b_end += size;
+ }
+ else
+ {
+ me->a_end += size;
+ }
+
+ __check_for_switch_to_b(me);
+ return size;
+}
+
+int bipbuf_offer(bipbuf_t* me, const unsigned char *data, const int size)
+{
+ /* not enough space */
+ if (bipbuf_unused(me) < size)
+ return 0;
+
+ if (1 == me->b_inuse)
+ {
+ memcpy(me->data + me->b_end, data, size);
+ me->b_end += size;
+ }
+ else
+ {
+ memcpy(me->data + me->a_end, data, size);
+ me->a_end += size;
+ }
+
+ __check_for_switch_to_b(me);
+ return size;
+}
+
+unsigned char *bipbuf_peek(const bipbuf_t* me, const unsigned int size)
+{
+ /* make sure we can actually peek at this data */
+ if (me->size < me->a_start + size)
+ return NULL;
+
+ if (bipbuf_is_empty(me))
+ return NULL;
+
+ return (unsigned char *)me->data + me->a_start;
+}
+
+unsigned char *bipbuf_peek_all(const bipbuf_t* me, unsigned int *size)
+{
+ if (bipbuf_is_empty(me))
+ return NULL;
+
+ *size = me->a_end - me->a_start;
+ return (unsigned char*)me->data + me->a_start;
+}
+
+unsigned char *bipbuf_poll(bipbuf_t* me, const unsigned int size)
+{
+ if (bipbuf_is_empty(me))
+ return NULL;
+
+ /* make sure we can actually poll this data */
+ if (me->size < me->a_start + size)
+ return NULL;
+
+ void *end = me->data + me->a_start;
+ me->a_start += size;
+
+ /* we seem to be empty.. */
+ if (me->a_start == me->a_end)
+ {
+ /* replace a with region b */
+ if (1 == me->b_inuse)
+ {
+ me->a_start = 0;
+ me->a_end = me->b_end;
+ me->b_end = me->b_inuse = 0;
+ }
+ else
+ /* safely move cursor back to the start because we are empty */
+ me->a_start = me->a_end = 0;
+ }
+
+ __check_for_switch_to_b(me);
+ return end;
+}
diff --git a/bipbuffer.h b/bipbuffer.h
new file mode 100644
index 0000000..b6ee56e
--- /dev/null
+++ b/bipbuffer.h
@@ -0,0 +1,87 @@
+#ifndef BIPBUFFER_H
+#define BIPBUFFER_H
+
+typedef struct
+{
+ unsigned long int size;
+
+ /* region A */
+ unsigned int a_start, a_end;
+
+ /* region B */
+ unsigned int b_end;
+
+ /* is B inuse? */
+ int b_inuse;
+
+ unsigned char data[];
+} bipbuf_t;
+
+/**
+ * Create a new bip buffer.
+ *
+ * malloc()s space
+ *
+ * @param[in] size The size of the buffer */
+bipbuf_t *bipbuf_new(const unsigned int size);
+
+/**
+ * Initialise a bip buffer. Use memory provided by user.
+ *
+ * No malloc()s are performed.
+ *
+ * @param[in] size The size of the array */
+void bipbuf_init(bipbuf_t* me, const unsigned int size);
+
+/**
+ * Free the bip buffer */
+void bipbuf_free(bipbuf_t *me);
+
+/* TODO: DOCUMENTATION */
+unsigned char *bipbuf_request(bipbuf_t* me, const int size);
+int bipbuf_push(bipbuf_t* me, const int size);
+
+/**
+ * @param[in] data The data to be offered to the buffer
+ * @param[in] size The size of the data to be offered
+ * @return number of bytes offered */
+int bipbuf_offer(bipbuf_t *me, const unsigned char *data, const int size);
+
+/**
+ * Look at data. Don't move cursor
+ *
+ * @param[in] len The length of the data to be peeked
+ * @return data on success, NULL if we can't peek at this much data */
+unsigned char *bipbuf_peek(const bipbuf_t* me, const unsigned int len);
+
+/**
+ * Look at data. Don't move cursor
+ *
+ * @param[in] len The length of the data returned
+ * @return data on success, NULL if nothing available */
+unsigned char *bipbuf_peek_all(const bipbuf_t* me, unsigned int *len);
+
+/**
+ * Get pointer to data to read. Move the cursor on.
+ *
+ * @param[in] len The length of the data to be polled
+ * @return pointer to data, NULL if we can't poll this much data */
+unsigned char *bipbuf_poll(bipbuf_t* me, const unsigned int size);
+
+/**
+ * @return the size of the bipbuffer */
+int bipbuf_size(const bipbuf_t* me);
+
+/**
+ * @return 1 if buffer is empty; 0 otherwise */
+int bipbuf_is_empty(const bipbuf_t* me);
+
+/**
+ * @return how much space we have assigned */
+int bipbuf_used(const bipbuf_t* cb);
+
+/**
+ * @return bytes of unused space */
+int bipbuf_unused(const bipbuf_t* me);
+
+#endif /* BIPBUFFER_H */
diff --git a/items.c b/items.c
index ddf712d..0586313 100644
--- a/items.c
+++ b/items.c
@@ -841,10 +841,12 @@ static int lru_pull_tail(const int orig_id, const int cur_lru,
case COLD_LRU:
it = search; /* No matter what, we're stopping */
if (do_evict) {
+ logger *l;
if (settings.evict_to_free == 0) {
/* Don't think we need a counter for this. It'll OOM. */
break;
}
+ l = GET_LOGGER();
itemstats[id].evicted++;
itemstats[id].evicted_time = current_time - search->time;
if (search->exptime != 0)
@@ -852,6 +854,8 @@ static int lru_pull_tail(const int orig_id, const int cur_lru,
if ((search->it_flags & ITEM_FETCHED) == 0) {
itemstats[id].evicted_unfetched++;
}
+ if (l->log_evictions)
+ logger_log(l, LOGGER_EVICTION, search);
do_item_unlink_nolock(search, hv);
removed++;
if (settings.slab_automove == 2) {
diff --git a/logger.c b/logger.c
new file mode 100644
index 0000000..8888b1e
--- /dev/null
+++ b/logger.c
@@ -0,0 +1,637 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <poll.h>
+
+#include "memcached.h"
+#include "bipbuffer.h"
+
+#define LOGGER_DEBUG 0
+
+#if LOGGER_DEBUG
+#define L_DEBUG(...) \
+ do { \
+ fprintf(stderr, __VA_ARGS__); \
+ } while (0)
+#else
+#define L_DEBUG(...)
+#endif
+
+
+/* TODO: put this in a struct and ditch the global vars. */
+static logger *logger_stack_head = NULL;
+static logger *logger_stack_tail = NULL;
+static unsigned int logger_count = 0;
+static volatile int do_run_logger_thread = 1;
+static pthread_t logger_tid;
+pthread_mutex_t logger_stack_lock = PTHREAD_MUTEX_INITIALIZER;
+
+pthread_key_t logger_key;
+
+bipbuf_t *logger_thread_buf = NULL;
+logger_chunk *logger_thread_last_lc = NULL;
+int logger_thread_lc_count = 0;
+
+#if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
+pthread_mutex_t logger_atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
+#endif
+
+#define WATCHER_LIMIT 20
+logger_watcher *watchers[20];
+struct pollfd watchers_pollfds[20];
+int watcher_count = 0;
+
+/* Should this go somewhere else? */
+static const entry_details default_entries[] = {
+ [LOGGER_ASCII_CMD] = {LOGGER_TEXT_ENTRY, 512, "<%d %s"},
+ [LOGGER_EVICTION] = {LOGGER_EVICTION_ENTRY, 512, "eviction: [key %s] [fetched: %s] [ttl: %d] [la: %d]"}
+};
+
+static void logger_poll_watchers(int force_poll);
+
+/* Logger GID's can be used by watchers to put logs back into strict order
+ */
+static uint64_t logger_get_gid(void) {
+ static uint64_t logger_gid = 0;
+#ifdef HAVE_GCC_ATOMICS
+ return __sync_add_and_fetch(&logger_gid, 1);
+#elif defined(__sun)
+ return atomic_inc_64_nv(&logger_gid);
+#else
+ mutex_lock(&logger_atomics_mutex);
+ uint64_t res = ++logger_gid;
+ mutex_unlock(&logger_atomics_mutex);
+ return res;
+#endif
+}
+
+/* TODO: genericize lists. would be nice to import queue.h if the impact is
+ * studied... otherwise can just write a local one.
+ */
+/* Add to the list of threads with a logger object */
+static void logger_link_q(logger *l) {
+ pthread_mutex_lock(&logger_stack_lock);
+ assert(l != logger_stack_head);
+
+ l->prev = 0;
+ l->next = logger_stack_head;
+ if (l->next) l->next->prev = l;
+ logger_stack_head = l;
+ if (logger_stack_tail == 0) logger_stack_tail = l;
+ logger_count++;
+ pthread_mutex_unlock(&logger_stack_lock);
+ return;
+}
+
+/* Remove from the list of threads with a logger object */
+/*static void logger_unlink_q(logger *l) {
+ pthread_mutex_lock(&logger_stack_lock);
+ if (logger_stack_head == l) {
+ assert(l->prev == 0);
+ logger_stack_head = l->next;
+ }
+ if (logger_stack_tail == l) {
+ assert(l->next == 0);
+ logger_stack_tail = l->prev;
+ }
+ assert(l->next != l);
+ assert(l->prev != l);
+
+ if (l->next) l->next->prev = l->prev;
+ if (l->prev) l->prev->next = l->next;
+ logger_count--;
+ pthread_mutex_unlock(&logger_stack_lock);
+ return;
+}*/
+
+/* Centralized log chunk buffer. All worker threads dequeue into this buffer,
+ * which gets written out to subscribed watchers.
+ * The "latest" global chunk is tracked.
+ * New chunks get attached to each active watcher, increasing refcount.
+ * Watchers flush from this buffer (tracking w->flushed) until buffer is full.
+ * When full, a new buffer is created and chained after the old one (lc->next)
+ * Watchers then catch up, releasing ref's on old chunks as they complete
+ * flushing.
+ * When a chunk is fully flushed by all watchers, it's released back to the
+ * central bipbuf.
+ * When totally out of chunk space, the most-behind watchers are closed.
+ *
+ * This fiddling is done to avoid excess memcpy'ing. We write directly into
+ * the central bipbuf rather than copy into it, and multiple readers access
+ * it.
+ */
+static logger_chunk *logger_get_chunk(void) {
+ logger_chunk *lc = logger_thread_last_lc;
+ int x;
+
+ if (lc && lc->filled == 0)
+ return lc;
+
+ lc = (logger_chunk *) bipbuf_request(logger_thread_buf, (8 * 1024));
+ if (lc == NULL) {
+ L_DEBUG("LOGGER: Chunk space full.\n");
+ return NULL;
+ }
+ bipbuf_push(logger_thread_buf, (8 * 1024));
+ L_DEBUG("LOGGER: Creating new chunk\n");
+ memset(lc, 0, sizeof(logger_chunk));
+ lc->size = (8 * 1024) - sizeof(logger_chunk);
+
+ /* Each watcher gets a reference to this new chunk */
+ for (x = 0; x < WATCHER_LIMIT; x++) {
+ logger_watcher *w = watchers[x];
+ if (w == NULL)
+ continue;
+
+ if (w->lc == NULL) {
+ /* Watcher doesn't already have a chunk. */
+ w->lc = lc;
+ }
+ lc->refcount++;
+ w->chunks++;
+ }
+
+ if (logger_thread_last_lc) {
+ assert(logger_thread_last_lc != lc);
+ logger_thread_last_lc->next = lc;
+ }
+ logger_thread_last_lc = lc;
+ logger_thread_lc_count++;
+
+ return lc;
+}
+
+/* return FULLBUF to tell caller buffer is full.
+ * caller will then flush and retry log parse
+ */
+static enum logger_parse_entry_ret logger_parse_entry(logger_chunk *lc, logentry *e) {
+ int total = 0;
+
+ switch (e->event) {
+ case LOGGER_TEXT_ENTRY:
+ case LOGGER_EVICTION_ENTRY:
+ total = snprintf(lc->data + lc->written, (lc->size - lc->written), "[%d.%d] [%llu] %s\n",
+ (int)e->tv.tv_sec, (int)e->tv.tv_usec, (unsigned long long) e->gid, (char *) e->data);
+ if (total >= (lc->size - lc->written) || total <= 0) {
+ /* ran out of space. don't advance written, run a flush,
+ * retry write?
+ */
+ L_DEBUG("LOGGER: Chunk filled\n");
+ lc->filled = 1;
+ return LOGGER_PARSE_ENTRY_FULLBUF;
+ } else {
+ lc->written += total;
+ }
+ break;
+ }
+
+ return LOGGER_PARSE_ENTRY_OK;
+}
+
+static void logger_chunk_release(logger_chunk *lc) {
+ lc->refcount--;
+ if (lc->refcount == 0) {
+ L_DEBUG("LOGGER: Releasing logger chunk\n");
+ if (lc->next == NULL) {
+ L_DEBUG("LOGGER: Clearing last LC chunk reference\n");
+ logger_thread_last_lc = NULL;
+ }
+ bipbuf_poll(logger_thread_buf, (8 * 1024));
+ logger_thread_lc_count--;
+ }
+}
+
+/* Called with logger stack locked.
+ * Releases every chunk associated with a watcher and closes the connection.
+ * We can't presently send a connection back to the worker for further
+ * processing.
+ */
+static void logger_close_watcher(logger_watcher *w) {
+ logger_chunk *lc = w->lc;
+ L_DEBUG("LOGGER: Closing dead watcher\n");
+ while (lc != NULL) {
+ logger_chunk *nlc = lc->next;
+ logger_chunk_release(lc);
+ lc = nlc;
+ }
+ watchers[w->id] = NULL;
+ sidethread_conn_close(w->c);
+ watcher_count--;
+ free(w);
+}
+
+/* Pick any number of "oldest" behind-watchers and kill them. */
+static void logger_kill_watchers(const int count) {
+ int x;
+ logger_watcher *w;
+
+ for (x = 0; x < WATCHER_LIMIT; x++) {
+ w = watchers[x];
+ if (w == NULL)
+ continue;
+
+ if (w->lc && w->chunks == count) {
+ L_DEBUG("LOGGER: Killing watcher [%d] because it's too behind (%d)\n",
+ w->sfd, w->chunks);
+ logger_close_watcher(w);
+ }
+ }
+}
+
+/* Reads a particular worker thread's available bipbuf bytes. Parses each log
+ * entry into the central logging output buffer.
+ * If we run out of buffer space, we simply kill off the most-behind watcher.
+ * Might be better to have options for dropping logs vs disconnecting?
+ */
+static int logger_thread_read(logger *l) {
+ unsigned int size;
+ unsigned int pos = 0;
+ unsigned char *data;
+ logentry *e;
+ logger_chunk *lc;
+ pthread_mutex_lock(&l->mutex);
+ data = bipbuf_peek_all(l->buf, &size);
+ pthread_mutex_unlock(&l->mutex);
+
+ if (data == NULL) {
+ return 0;
+ }
+ L_DEBUG("LOGGER: Got %d bytes from bipbuffer\n", size);
+
+ /* parse buffer */
+ while (pos < size && watcher_count > 0) {
+ enum logger_parse_entry_ret ret;
+ e = (logentry *) (data + pos);
+ lc = logger_get_chunk();
+ if (lc == NULL) {
+ /* out of buffer space. show must go on, so kill something. */
+ logger_kill_watchers(logger_thread_lc_count);
+ continue;
+ }
+ ret = logger_parse_entry(lc, e);
+ if (ret == LOGGER_PARSE_ENTRY_FULLBUF) {
+ /* Buffer was full. Push the last up, force an early flush. */
+ logger_poll_watchers(0);
+ continue;
+ } else if (ret != LOGGER_PARSE_ENTRY_OK) {
+ fprintf(stderr, "LOGGER: Failed to parse log entry\n");
+ abort();
+ }
+ pos += sizeof(logentry) + e->size;
+ }
+ assert(pos <= size);
+
+ pthread_mutex_lock(&l->mutex);
+ data = bipbuf_poll(l->buf, size);
+ pthread_mutex_unlock(&l->mutex);
+ if (data == NULL) {
+ fprintf(stderr, "LOGGER: unexpectedly couldn't advance buf pointer\n");
+ abort();
+ return -1;
+ }
+ return size; /* maybe the count of objects iterated? */
+}
+
+/* helper function or #define: iterate over loggers */
+/* called with logger_stack_lock held */
+static int logger_iterate(void) {
+ logger *l = NULL;
+ int count = 0;
+ for (l = logger_stack_head; l != NULL; l=l->next) {
+ /* lock logger, call function to manipulate it */
+ count += logger_thread_read(l);
+ }
+ return count;
+}
+
+/* Since the event loop code isn't reusable without a refactor, and we have a
+ * limited number of potential watchers, we run our own poll loop.
+ * This calls poll() unnecessarily during write flushes, should be possible to
+ * micro-optimize later.
+ *
+ * This flushes buffers attached to watchers, iterating through the chunks set
+ * to each worker. Also checks for readability in case client connection was
+ * closed.
+ */
+static void logger_poll_watchers(int force_poll) {
+ int x;
+ int nfd = 0;
+ logger_watcher *w;
+
+ for (x = 0; x < WATCHER_LIMIT; x++) {
+ w = watchers[x];
+ if (w == NULL)
+ continue;
+
+ /* If this chunk has been flushed, release it and start the next one.
+ * Next chunk may be null, which stops writing. */
+ if (w->lc != NULL && w->flushed >= w->lc->written && w->lc->filled) {
+ logger_chunk *nlc = w->lc->next;
+ logger_chunk_release(w->lc);
+ /* More buffers to eat */
+ w->lc = nlc;
+ w->chunks--;
+ w->flushed = 0;
+ }
+
+ if (w->lc != NULL && w->lc->written > w->flushed) {
+ watchers_pollfds[nfd].fd = w->sfd;
+ watchers_pollfds[nfd].events = POLLOUT;
+ nfd++;
+ } else if (force_poll) {
+ watchers_pollfds[nfd].fd = w->sfd;
+ watchers_pollfds[nfd].events = POLLIN;
+ nfd++;
+ }
+ }
+
+ if (nfd == 0)
+ return;
+
+ L_DEBUG("LOGGER: calling poll()\n");
+ int ret = poll(watchers_pollfds, nfd, 0);
+
+ if (ret < 0) {
+ perror("something failed with logger thread watcher fd polling");
+ return;
+ }
+
+ nfd = 0;
+ for (x = 0; x < WATCHER_LIMIT; x++) {
+ w = watchers[x];
+ if (w == NULL)
+ continue;
+
+ /* Early detection of a disconnect. Otherwise we have to wait until
+ * the next write
+ */
+ if (watchers_pollfds[nfd].revents & POLLIN) {
+ char buf[1];
+ int res = read(w->sfd, buf, 1);
+ if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK))) {
+ logger_close_watcher(w);
+ nfd++;
+ continue;
+ }
+ }
+ if (w->lc != NULL) {
+ if (watchers_pollfds[nfd].revents & (POLLHUP|POLLERR)) {
+ L_DEBUG("LOGGER: watcher closed during poll() call\n");
+ logger_close_watcher(w);
+ } else if (watchers_pollfds[nfd].revents & POLLOUT) {
+ int total = 0;
+ /* We can write a bit. */
+ switch (w->t) {
+ case LOGGER_WATCHER_STDERR:
+ total = fwrite(w->lc->data + w->flushed, 1, w->lc->written - w->flushed, stderr);
+ break;
+ case LOGGER_WATCHER_CLIENT:
+ total = write(w->sfd, w->lc->data + w->flushed, w->lc->written - w->flushed);
+ break;
+ }
+
+ L_DEBUG("LOGGER: poll() wrote %d to %d (written: %d) (flushed: %d)\n", total, w->sfd,
+ w->lc->written, w->flushed);
+ if (total == -1) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ logger_close_watcher(w);
+ }
+ L_DEBUG("LOGGER: watcher hit EAGAIN\n");
+ nfd++;
+ continue;
+ } else if (total == 0) {
+ logger_close_watcher(w);
+ }
+
+ w->flushed += total;
+ }
+ nfd++;
+ }
+ }
+}
+
+#define MAX_LOGGER_SLEEP 100000
+#define MIN_LOGGER_SLEEP 0
+
+/* Primary logger thread routine */
+static void *logger_thread(void *arg) {
+ useconds_t to_sleep = MIN_LOGGER_SLEEP;
+ L_DEBUG("LOGGER: Starting logger thread\n");
+ while (do_run_logger_thread) {
+ int found_logs = 0;
+ if (to_sleep)
+ usleep(to_sleep);
+
+ /* Call function to iterate each logger. */
+ pthread_mutex_lock(&logger_stack_lock);
+ /* check poll() for current slow watchers */
+ found_logs = logger_iterate();
+ logger_poll_watchers(1);
+ pthread_mutex_unlock(&logger_stack_lock);
+
+ /* TODO: abstract into a function and share with lru_crawler */
+ if (!found_logs) {
+ if (to_sleep < MAX_LOGGER_SLEEP)
+ to_sleep += 50;
+ } else {
+ to_sleep /= 2;
+ if (to_sleep < 50)
+ to_sleep = MIN_LOGGER_SLEEP;
+ }
+ }
+
+ return NULL;
+}
+
+static int start_logger_thread(void) {
+ int ret;
+ do_run_logger_thread = 1;
+ if ((ret = pthread_create(&logger_tid, NULL,
+ logger_thread, NULL)) != 0) {
+ fprintf(stderr, "Can't start logger thread: %s\n", strerror(ret));
+ return -1;
+ }
+ return 0;
+}
+
+// future.
+/*static int stop_logger_thread(void) {
+ do_run_logger_thread = 0;
+ pthread_join(logger_tid, NULL);
+ return 0;
+}*/
+
+/* Global logger thread start/init */
+void logger_init(void) {
+ /* TODO: auto destructor when threads exit */
+ /* TODO: error handling */
+
+ /* init stack for iterating loggers */
+ logger_stack_head = 0;
+ logger_stack_tail = 0;
+ pthread_key_create(&logger_key, NULL);
+
+ logger_thread_buf = bipbuf_new(LOGGER_THREAD_BUF_SIZE);
+ if (logger_thread_buf == NULL) {
+ abort();
+ }
+
+ if (start_logger_thread() != 0) {
+ abort();
+ }
+ /* FIXME: temp hack to always add STDERR watcher */
+ //logger_add_watcher(NULL, 0);
+ return;
+}
+
+/* called *from* the thread using a logger.
+ * initializes the per-thread bipbuf, links it into the list of loggers
+ */
+logger *logger_create(void) {
+ L_DEBUG("LOGGER: Creating and linking new logger instance\n");
+ logger *l = calloc(1, sizeof(logger));
+ if (l == NULL) {
+ return NULL;
+ }
+
+ l->buf = bipbuf_new(LOGGER_BUF_SIZE);
+ if (l->buf == NULL) {
+ free(l);
+ return NULL;
+ }
+
+ /* FIXME: hardcoded defaults for testing stage */
+ l->entry_map = default_entries;
+ //l->log_fetchers = 1;
+ l->log_evictions = 1;
+ l->log_time = 1;
+
+ pthread_mutex_init(&l->mutex, NULL);
+ pthread_setspecific(logger_key, l);
+
+ /* add to list of loggers */
+ logger_link_q(l);
+ return l;
+}
+
+#define SET_LOGGER_TIME() \
+ do { \
+ if (l->log_time) { \
+ gettimeofday(&e->tv, NULL); \
+ } \
+ } while (0)
+
+/* Public function for logging an entry.
+ * Tries to encapsulate as much of the formatting as possible to simplify the
+ * caller's code.
+ */
+enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, const void *entry, ...) {
+ bipbuf_t *buf = l->buf;
+ bool nospace = false;
+ va_list ap;
+ int total = 0;
+ logentry *e;
+ char scratch[512]; /* some local scratch space */
+ item *it;
+
+ const entry_details *d = &l->entry_map[event];
+ int reqlen = d->reqlen;
+ uint64_t gid = logger_get_gid();
+
+ pthread_mutex_lock(&l->mutex);
+ /* Request a maximum length of data to write to */
+ e = (logentry *) bipbuf_request(buf, (sizeof(logentry) + reqlen));
+ if (e == NULL) {
+ pthread_mutex_unlock(&l->mutex);
+ return LOGGER_RET_NOSPACE;
+ }
+ e->gid = gid;
+ e->event = d->subtype;
+ SET_LOGGER_TIME();
+
+ switch (d->subtype) {
+ case LOGGER_TEXT_ENTRY:
+ va_start(ap, entry);
+ total = vsnprintf((char *) e->data, reqlen, d->format, ap);
+ va_end(ap);
+ if (total >= reqlen || total <= 0) {
+ fprintf(stderr, "LOGGER: Failed to vsnprintf a text entry: (total) %d\n", total);
+ break;
+ }
+ e->size = total + 1; /* null byte */
+
+ break;
+ case LOGGER_EVICTION_ENTRY:
+ it = (item *)entry;
+ memcpy(scratch, ITEM_key(it), it->nkey);
+ scratch[it->nkey] = '\0';
+ total = snprintf((char *) e->data, reqlen, d->format, scratch,
+ (it->it_flags & ITEM_FETCHED) ? "yes" : "no",
+ (it->exptime > 0) ? (it->exptime - current_time) : -1,
+ (current_time - it->time));
+ e->size = total + 1;
+
+ break;
+ }
+
+ /* Push pointer forward by the actual amount required */
+ if (bipbuf_push(buf, (sizeof(logentry) + e->size)) == 0) {
+ fprintf(stderr, "LOGGER: Failed to bipbuf push a text entry\n");
+ pthread_mutex_unlock(&l->mutex);
+ return LOGGER_RET_ERR;
+ }
+ L_DEBUG("LOGGER: Requested %d bytes, wrote %d bytes\n", reqlen, total + 1);
+
+ pthread_mutex_unlock(&l->mutex);
+
+ if (nospace) {
+ return LOGGER_RET_NOSPACE;
+ } else {
+ return LOGGER_RET_OK;
+ }
+}
+
+/* Passes a client connection socket from a primary worker thread to the
+ * logger thread. Caller *must* event_del() the client before handing it over.
+ * Presently there's no way to hand the client back to the worker thread.
+ */
+enum logger_add_watcher_ret logger_add_watcher(void *c, int sfd) {
+ int x;
+ logger_watcher *w = NULL;
+ pthread_mutex_lock(&logger_stack_lock);
+ if (watcher_count >= WATCHER_LIMIT) {
+ return LOGGER_ADD_WATCHER_TOO_MANY;
+ }
+
+ for (x = 0; x < WATCHER_LIMIT; x++) {
+ if (watchers[x] == NULL)
+ break;
+ }
+
+ w = calloc(1, sizeof(logger_watcher));
+ if (w == NULL)
+ return LOGGER_ADD_WATCHER_FAILED;
+ w->c = c;
+ w->sfd = sfd;
+ if (sfd == 0 && c == NULL) {
+ w->t = LOGGER_WATCHER_STDERR;
+ } else {
+ w->t = LOGGER_WATCHER_CLIENT;
+ }
+ w->id = x;
+ /* Attach to an existing log chunk if there is one */
+ if (logger_thread_last_lc && !logger_thread_last_lc->filled) {
+ logger_chunk *lc = logger_thread_last_lc;
+ lc->refcount++;
+ w->lc = lc;
+ w->flushed = lc->written; /* only write logs after we attach */
+ w->chunks++;
+ }
+ watchers[x] = w;
+ watcher_count++;
+
+ pthread_mutex_unlock(&logger_stack_lock);
+ return LOGGER_ADD_WATCHER_OK;
+}
diff --git a/logger.h b/logger.h
new file mode 100644
index 0000000..3756f0f
--- /dev/null
+++ b/logger.h
@@ -0,0 +1,114 @@
+/* logging functions */
+#ifndef LOGGER_H
+#define LOGGER_H
+
+#include "bipbuffer.h"
+
+/* TODO: starttime tunable */
+#define LOGGER_BUF_SIZE 1024 * 64
+#define LOGGER_THREAD_BUF_SIZE 1024 * 256
+#define LOGGER_ENTRY_MAX_SIZE 2048
+#define GET_LOGGER() ((logger *) pthread_getspecific(logger_key));
+
+enum log_entry_type {
+ LOGGER_ASCII_CMD = 0,
+ LOGGER_EVICTION
+};
+
+enum log_entry_subtype {
+ LOGGER_TEXT_ENTRY = 0,
+ LOGGER_EVICTION_ENTRY
+};
+
+enum logger_ret_type {
+ LOGGER_RET_OK = 0,
+ LOGGER_RET_NOSPACE,
+ LOGGER_RET_ERR
+};
+
+enum logger_parse_entry_ret {
+ LOGGER_PARSE_ENTRY_OK = 0,
+ LOGGER_PARSE_ENTRY_FULLBUF
+};
+
+typedef const struct {
+ enum log_entry_subtype subtype;
+ int reqlen;
+ char *format;
+} entry_details;
+
+typedef struct _logentry {
+ enum log_entry_subtype event;
+ uint64_t gid;
+ struct timeval tv; /* not monotonic! */
+ int size;
+ union {
+ void *entry; /* probably an item */
+ char end;
+ } data[];
+} logentry;
+
+typedef struct _logger {
+ struct _logger *prev;
+ struct _logger *next;
+ pthread_mutex_t mutex; /* guard for this + *buf */
+ uint64_t logged; /* entries written to the buffer */
+ uint64_t dropped; /* entries dropped */
+ uint64_t blocked; /* times blocked instead of dropped */
+ uint16_t fetcher_ratio; /* log one out of every N fetches */
+ uint16_t mutation_ratio; /* log one out of every N mutations */
+ unsigned int log_sysevents :1; /* threads start/stop/working */
+ unsigned int log_fetchers :1; /* get/gets/etc */
+ unsigned int log_mutations :1; /* set/append/incr/etc */
+ unsigned int log_syserrors :1; /* malloc/etc errors */
+ unsigned int log_connevents :1; /* new client, closed, etc */
+ unsigned int log_vconnevents :1; /* client state changes */
+ unsigned int log_evictions :1; /* log details of evicted items */
+ unsigned int log_strict :1; /* block instead of drop */
+ unsigned int log_time :1; /* log the time the entry is created */
+ bipbuf_t *buf;
+ const entry_details *entry_map;
+} logger;
+
+typedef struct _logger_chunk {
+ struct _logger_chunk *next;
+ int size; /* max potential size */
+ int written; /* amount written into the buffer (actual size) */
+ int refcount; /* number of attached watchers */
+ unsigned int filled :1; /* reached storage max */
+ char data[];
+} logger_chunk;
+
+enum logger_watcher_type {
+ LOGGER_WATCHER_STDERR = 0,
+ LOGGER_WATCHER_CLIENT = 1
+};
+
+typedef struct {
+ void *c; /* original connection structure. still with source thread attached */
+ logger_chunk *lc;
+ int chunks; /* count of chunks stored up */
+ int sfd; /* client fd */
+ int flushed; /* backlog data flushed so far from active chunk */
+ int id; /* id number for watcher list */
+ enum logger_watcher_type t; /* stderr, client, syslog, etc */
+} logger_watcher;
+
+extern pthread_key_t logger_key;
+
+/* public functions */
+
+void logger_init(void);
+logger *logger_create(void);
+
+enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, const void *entry, ...);
+
+enum logger_add_watcher_ret {
+ LOGGER_ADD_WATCHER_TOO_MANY = 0,
+ LOGGER_ADD_WATCHER_OK,
+ LOGGER_ADD_WATCHER_FAILED
+};
+
+enum logger_add_watcher_ret logger_add_watcher(void *c, int sfd);
+
+#endif
diff --git a/memcached.c b/memcached.c
index 61322e9..0125075 100644
--- a/memcached.c
+++ b/memcached.c
@@ -670,7 +670,8 @@ static const char *state_text(enum conn_states state) {
"conn_swallow",
"conn_closing",
"conn_mwrite",
- "conn_closed" };
+ "conn_closed",
+ "conn_watch" };
return statenames[state];
}
@@ -3444,8 +3445,8 @@ static void process_command(conn *c, char *command) {
MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
- if (settings.verbose > 1)
- fprintf(stderr, "<%d %s\n", c->sfd, command);
+ if (c->thread->l->log_fetchers)
+ logger_log(c->thread->l, LOGGER_ASCII_CMD, NULL, c->sfd, command);
/*
* for commands set/add/replace, we build an item and read the data
@@ -3675,6 +3676,20 @@ static void process_command(conn *c, char *command) {
} else {
out_string(c, "ERROR");
}
+ } else if (ntokens > 1 && strcmp(tokens[COMMAND_TOKEN].value, "watch") == 0) {
+ /* TODO: pass to function for full argument processing. */
+ switch(logger_add_watcher(c, c->sfd)) {
+ case LOGGER_ADD_WATCHER_TOO_MANY:
+ out_string(c, "WATCHER_TOO_MANY log watcher limit reached");
+ break;
+ case LOGGER_ADD_WATCHER_FAILED:
+ out_string(c, "WATCHER_FAILED failed to add log watcher");
+ break;
+ case LOGGER_ADD_WATCHER_OK:
+ conn_set_state(c, conn_watch);
+ event_del(&c->event);
+ break;
+ }
} else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
process_verbosity_command(c, tokens, ntokens);
} else {
@@ -4380,6 +4395,10 @@ static void drive_machine(conn *c) {
abort();
break;
+ case conn_watch:
+ /* We handed off our connection to the logger thread. */
+ stop = true;
+ break;
case conn_max_state:
assert(false);
break;
@@ -5674,6 +5693,7 @@ int main (int argc, char **argv) {
main_base = event_init();
/* initialize other stuff */
+ logger_init();
stats_init();
assoc_init(settings.hashpower_init);
conn_init();
diff --git a/memcached.h b/memcached.h
index 445e122..0b39a66 100644
--- a/memcached.h
+++ b/memcached.h
@@ -17,9 +17,11 @@
#include <netdb.h>
#include <pthread.h>
#include <unistd.h>
+#include <assert.h>
#include "protocol_binary.h"
#include "cache.h"
+#include "logger.h"
#include "sasl_defs.h"
@@ -162,6 +164,7 @@ enum conn_states {
conn_closing, /**< closing this connection */
conn_mwrite, /**< writing out many items sequentially */
conn_closed, /**< connection is closed */
+ conn_watch, /**< held by the logger thread as a watcher */
conn_max_state /**< Max state value (used for assertion) */
};
@@ -416,6 +419,7 @@ typedef struct {
struct thread_stats stats; /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache; /* suffix cache */
+ logger *l; /* logger buffer */
} LIBEVENT_THREAD;
typedef struct {
@@ -574,6 +578,7 @@ extern int daemonize(int nochdir, int noclose);
void memcached_thread_init(int nthreads, struct event_base *main_base);
int dispatch_event_add(int thread, conn *c);
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport);
+void sidethread_conn_close(conn *c);
/* Lock wrappers for cache functions that are called from main loop. */
enum delta_result_type add_delta(conn *c, const char *key,
diff --git a/thread.c b/thread.c
index 0a1a859..2113dcc 100644
--- a/thread.c
+++ b/thread.c
@@ -371,6 +371,10 @@ static void *worker_libevent(void *arg) {
/* Any per-thread setup can happen here; memcached_thread_init() will block until
* all threads have finished initializing.
*/
+ me->l = logger_create();
+ if (me->l == NULL) {
+ abort();
+ }
register_thread_initialized();
@@ -464,6 +468,48 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
}
/*
+ * Re-dispatches a connection back to the original thread. Can be called from
+ * any side thread borrowing a connection.
+ * TODO: Look into this. too complicated?
+ */
+#ifdef BOGUS_DEFINE
+void redispatch_conn(conn *c) {
+ CQ_ITEM *item = cqi_new();
+ char buf[1];
+ if (item == NULL) {
+ /* Can't cleanly redispatch connection. close it forcefully. */
+ /* FIXME: is conn_cleanup() necessary?
+ * if conn was handed off to a side thread it should be clean.
+ * could also put it into a "clean_me" state?
+ */
+ c->state = conn_closed;
+ close(c->sfd);
+ return;
+ }
+ LIBEVENT_THREAD *thread = c->thread;
+ item->sfd = sfd;
+ /* pass in the state somehow?
+ item->init_state = conn_closing; */
+ item->event_flags = c->event_flags;
+ item->conn = c;
+}
+#endif
+
+/* This misses the allow_new_conns flag :( */
+void sidethread_conn_close(conn *c) {
+ c->state = conn_closed;
+ if (settings.verbose > 1)
+ fprintf(stderr, "<%d connection closed from side thread.\n", c->sfd);
+ close(c->sfd);
+
+ STATS_LOCK();
+ stats.curr_conns--;
+ STATS_UNLOCK();
+
+ return;
+}
+
+/*
* Returns true if this is the thread that listens for new TCP connections.
*/
int is_listen_thread() {