summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/protocol.txt7
-rw-r--r--logger.c100
-rw-r--r--logger.h11
-rw-r--r--memcached.c17
-rw-r--r--memcached.h8
-rw-r--r--proto_bin.c1
-rw-r--r--proto_text.c5
-rw-r--r--t/watcher.t36
8 files changed, 183 insertions, 2 deletions
diff --git a/doc/protocol.txt b/doc/protocol.txt
index e059d2d..b34690b 100644
--- a/doc/protocol.txt
+++ b/doc/protocol.txt
@@ -1125,7 +1125,7 @@ Watchers are a way to connect to memcached and inspect what's going on
internally. This is an evolving feature so new endpoints should show up over
time.
-watch <fetchers|mutations|evictions>
+watch <fetchers|mutations|evictions|connevents>
- Turn connection into a watcher. Options can be stacked and are
space-separated. Logs will be sent to the watcher until it disconnects.
@@ -1160,6 +1160,11 @@ The arguments are:
cache. Useful in seeing if items being evicted were actually used, and which
keys are getting removed.
+- "connevents": Emits logs when connections are opened and closed, i.e. when
+ clients connect or disconnect. For TCP transports, logs indicate the remote
+ address IP and port. Connection close events additionally supply a reason for
+ closing the connection.
+
Statistics
----------
diff --git a/logger.c b/logger.c
index 06693c3..16cd22e 100644
--- a/logger.c
+++ b/logger.c
@@ -1,5 +1,6 @@
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+#include <arpa/inet.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
@@ -56,6 +57,8 @@ static const entry_details default_entries[] = {
[LOGGER_SLAB_MOVE] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS,
"type=slab_move src=%d dst=%d"
},
+ [LOGGER_CONNECTION_NEW] = {LOGGER_CONNECTION_NEW_ENTRY, 512, LOG_CONNEVENTS, NULL},
+ [LOGGER_CONNECTION_CLOSE] = {LOGGER_CONNECTION_CLOSE_ENTRY, 512, LOG_CONNEVENTS, NULL},
#ifdef EXTSTORE
[LOGGER_EXTSTORE_WRITE] = {LOGGER_EXT_WRITE_ENTRY, 512, LOG_EVICTIONS, NULL},
[LOGGER_COMPACT_START] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS,
@@ -177,6 +180,38 @@ static void logger_set_flags(void) {
}
/*************************
+ * Util functions used by the logger background thread
+ *************************/
+
+static int _logger_util_addr_endpoint(struct sockaddr *addr, char *rip,
+ size_t riplen, unsigned short *rport) {
+ memset(rip, 0, riplen);
+
+ switch (addr->sa_family) {
+ case AF_INET:
+ inet_ntop(AF_INET, &((struct sockaddr_in *) addr)->sin_addr,
+ rip, riplen - 1);
+ *rport = ntohs(((struct sockaddr_in *) addr)->sin_port);
+ break;
+ case AF_INET6:
+ inet_ntop(AF_INET6, &((struct sockaddr_in6 *) addr)->sin6_addr,
+ rip, riplen - 1);
+ *rport = ntohs(((struct sockaddr_in6 *) addr)->sin6_port);
+ break;
+#ifndef DISABLE_UNIX_SOCKET
+ // Connections on Unix socket transports have c->request_addr zeroed out.
+ case AF_UNSPEC:
+ case AF_UNIX:
+ strncpy(rip, "unix", strlen("unix") + 1);
+ *rport = 0;
+ break;
+#endif /* #ifndef DISABLE_UNIX_SOCKET */
+ }
+
+ return 0;
+}
+
+/*************************
* Logger background thread functions. Aggregates per-worker buffers and
* writes to any watchers.
*************************/
@@ -235,6 +270,7 @@ static int _logger_thread_parse_ee(logentry *e, char *scratch) {
return total;
}
+
#ifdef EXTSTORE
static int _logger_thread_parse_extw(logentry *e, char *scratch) {
int total;
@@ -250,6 +286,43 @@ static int _logger_thread_parse_extw(logentry *e, char *scratch) {
return total;
}
#endif
+
+static int _logger_thread_parse_cne(logentry *e, char *scratch) {
+ int total;
+ unsigned short rport;
+ char rip[64];
+ struct logentry_conn_event *le = (struct logentry_conn_event *) e->data;
+ const char * const transport_map[] = { "local", "tcp", "udp" };
+
+ _logger_util_addr_endpoint(&le->addr, rip, sizeof(rip), &rport);
+
+ total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
+ "ts=%d.%d gid=%llu type=conn_new rip=%s rport=%hu transport=%s cfd=%d\n",
+ (int) e->tv.tv_sec, (int) e->tv.tv_usec, (unsigned long long) e->gid,
+ rip, rport, transport_map[le->transport], le->sfd);
+
+ return total;
+}
+
+static int _logger_thread_parse_cce(logentry *e, char *scratch) {
+ int total;
+ unsigned short rport;
+ char rip[64];
+ struct logentry_conn_event *le = (struct logentry_conn_event *) e->data;
+ const char * const transport_map[] = { "local", "tcp", "udp" };
+ const char * const reason_map[] = { "error", "normal", "idle_timeout", "shutdown" };
+
+ _logger_util_addr_endpoint(&le->addr, rip, sizeof(rip), &rport);
+
+ total = snprintf(scratch, LOGGER_PARSE_SCRATCH,
+ "ts=%d.%d gid=%llu type=conn_close rip=%s rport=%hu transport=%s reason=%s cfd=%d\n",
+ (int) e->tv.tv_sec, (int) e->tv.tv_usec, (unsigned long long) e->gid,
+ rip, rport, transport_map[le->transport],
+ reason_map[le->reason], le->sfd);
+
+ return total;
+}
+
/* Completes rendering of log line. */
static enum logger_parse_entry_ret logger_thread_parse_entry(logentry *e, struct logger_stats *ls,
char *scratch, int *scratch_len) {
@@ -275,6 +348,12 @@ static enum logger_parse_entry_ret logger_thread_parse_entry(logentry *e, struct
case LOGGER_ITEM_STORE_ENTRY:
total = _logger_thread_parse_ise(e, scratch);
break;
+ case LOGGER_CONNECTION_NEW_ENTRY:
+ total = _logger_thread_parse_cne(e, scratch);
+ break;
+ case LOGGER_CONNECTION_CLOSE_ENTRY:
+ total = _logger_thread_parse_cce(e, scratch);
+ break;
}
@@ -696,6 +775,17 @@ static void _logger_log_item_store(logentry *e, const enum store_item_type statu
e->size = sizeof(struct logentry_item_store) + nkey;
}
+static void _logger_log_conn_event(logentry *e, struct sockaddr *addr,
+ enum network_transport transport, enum close_reasons reason, int sfd) {
+ struct logentry_conn_event *le = (struct logentry_conn_event *) e->data;
+
+ memcpy(&le->addr, addr, sizeof(struct sockaddr));
+ le->sfd = sfd;
+ le->transport = transport;
+ le->reason = reason;
+ e->size = sizeof(struct logentry_conn_event);
+}
+
/* Public function for logging an entry.
* Tries to encapsulate as much of the formatting as possible to simplify the
* caller's code.
@@ -777,6 +867,16 @@ enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, cons
_logger_log_item_store(e, status, comm, skey, snkey, snbytes, sttl, sclsid, ssfd);
va_end(ap);
break;
+ case LOGGER_CONNECTION_NEW_ENTRY:
+ case LOGGER_CONNECTION_CLOSE_ENTRY:
+ va_start(ap, entry);
+ struct sockaddr *addr = va_arg(ap, struct sockaddr *);
+ enum network_transport transport = va_arg(ap, enum network_transport);
+ enum close_reasons reason = va_arg(ap, enum close_reasons);
+ int csfd = va_arg(ap, int);
+ va_end(ap);
+ _logger_log_conn_event(e, addr, transport, reason, csfd);
+ break;
}
#ifdef NEED_ALIGN
diff --git a/logger.h b/logger.h
index 715f3af..5492f72 100644
--- a/logger.h
+++ b/logger.h
@@ -20,6 +20,8 @@ enum log_entry_type {
LOGGER_ITEM_STORE,
LOGGER_CRAWLER_STATUS,
LOGGER_SLAB_MOVE,
+ LOGGER_CONNECTION_NEW,
+ LOGGER_CONNECTION_CLOSE,
#ifdef EXTSTORE
LOGGER_EXTSTORE_WRITE,
LOGGER_COMPACT_START,
@@ -36,6 +38,8 @@ enum log_entry_subtype {
LOGGER_EVICTION_ENTRY,
LOGGER_ITEM_GET_ENTRY,
LOGGER_ITEM_STORE_ENTRY,
+ LOGGER_CONNECTION_NEW_ENTRY,
+ LOGGER_CONNECTION_CLOSE_ENTRY,
#ifdef EXTSTORE
LOGGER_EXT_WRITE_ENTRY,
#endif
@@ -101,6 +105,13 @@ struct logentry_item_store {
char key[];
};
+struct logentry_conn_event {
+ int transport;
+ int reason;
+ int sfd;
+ struct sockaddr addr;
+};
+
/* end intermediary structures */
/* WARNING: cuddled items aren't compatible with warm restart. more code
diff --git a/memcached.c b/memcached.c
index b8de7e8..60c027f 100644
--- a/memcached.c
+++ b/memcached.c
@@ -526,6 +526,8 @@ void conn_close_idle(conn *c) {
c->thread->stats.idle_kicks++;
pthread_mutex_unlock(&c->thread->stats.mutex);
+ c->close_reason = IDLE_TIMEOUT_CLOSE;
+
conn_set_state(c, conn_closing);
drive_machine(c);
}
@@ -672,6 +674,11 @@ conn *conn_new(const int sfd, enum conn_states init_state,
}
}
+ if (init_state == conn_new_cmd) {
+ LOGGER_LOG(NULL, LOG_CONNEVENTS, LOGGER_CONNECTION_NEW, NULL,
+ (struct sockaddr *) &c->request_addr, c->transport, 0, sfd);
+ }
+
if (settings.verbose > 1) {
if (init_state == conn_listening) {
fprintf(stderr, "<%d server listening (%s)\n", sfd,
@@ -855,6 +862,12 @@ void conn_free(conn *c) {
static void conn_close(conn *c) {
assert(c != NULL);
+ if (c->thread) {
+ LOGGER_LOG(c->thread->l, LOG_CONNEVENTS, LOGGER_CONNECTION_CLOSE, NULL,
+ (struct sockaddr *) &c->request_addr, c->transport,
+ c->close_reason, c->sfd);
+ }
+
/* delete the event, the socket and the conn */
event_del(&c->event);
@@ -878,6 +891,7 @@ static void conn_close(conn *c) {
}
#endif
close(c->sfd);
+ c->close_reason = 0;
pthread_mutex_lock(&conn_lock);
allow_new_conns = true;
pthread_mutex_unlock(&conn_lock);
@@ -2378,6 +2392,7 @@ static enum try_read_result try_read_network(conn *c) {
}
}
if (res == 0) {
+ c->close_reason = NORMAL_CLOSE;
return READ_ERROR;
}
if (res == -1) {
@@ -3118,6 +3133,7 @@ static void drive_machine(conn *c) {
}
if (res == 0) { /* end of stream */
+ c->close_reason = NORMAL_CLOSE;
conn_set_state(c, conn_closing);
break;
}
@@ -3183,6 +3199,7 @@ static void drive_machine(conn *c) {
break;
}
if (res == 0) { /* end of stream */
+ c->close_reason = NORMAL_CLOSE;
conn_set_state(c, conn_closing);
break;
}
diff --git a/memcached.h b/memcached.h
index 8648f25..739c939 100644
--- a/memcached.h
+++ b/memcached.h
@@ -245,6 +245,13 @@ enum stop_reasons {
EXIT_NORMALLY
};
+enum close_reasons {
+ ERROR_CLOSE,
+ NORMAL_CLOSE,
+ IDLE_TIMEOUT_CLOSE,
+ SHUTDOWN_CLOSE,
+};
+
#define IS_TCP(x) (x == tcp_transport)
#define IS_UDP(x) (x == udp_transport)
@@ -773,6 +780,7 @@ struct conn {
#endif
enum protocol protocol; /* which protocol this connection speaks */
enum network_transport transport; /* what transport is used by this connection */
+ enum close_reasons close_reason; /* reason for transition into conn_closing */
/* data for UDP clients */
int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
diff --git a/proto_bin.c b/proto_bin.c
index 70b196f..a6bf520 100644
--- a/proto_bin.c
+++ b/proto_bin.c
@@ -1032,6 +1032,7 @@ static void dispatch_bin_command(conn *c, char *extbuf) {
write_bin_response(c, NULL, 0, 0, 0);
conn_set_state(c, conn_mwrite);
c->close_after_write = true;
+ c->close_reason = NORMAL_CLOSE;
} else {
protocol_error = 1;
}
diff --git a/proto_text.c b/proto_text.c
index 22be817..000ddd1 100644
--- a/proto_text.c
+++ b/proto_text.c
@@ -2269,6 +2269,8 @@ static void process_watch_command(conn *c, token_t *tokens, const size_t ntokens
f |= LOG_MUTATIONS;
} else if ((strcmp(tokens[x].value, "sysevents") == 0)) {
f |= LOG_SYSEVENTS;
+ } else if ((strcmp(tokens[x].value, "connevents") == 0)) {
+ f |= LOG_CONNEVENTS;
} else {
out_string(c, "ERROR");
return;
@@ -2485,6 +2487,7 @@ static void process_version_command(conn *c) {
static void process_quit_command(conn *c) {
conn_set_state(c, conn_mwrite);
c->close_after_write = true;
+ c->close_reason = NORMAL_CLOSE;
}
static void process_shutdown_command(conn *c, token_t *tokens, const size_t ntokens) {
@@ -2494,9 +2497,11 @@ static void process_shutdown_command(conn *c, token_t *tokens, const size_t ntok
}
if (ntokens == 2) {
+ c->close_reason = SHUTDOWN_CLOSE;
conn_set_state(c, conn_closing);
raise(SIGINT);
} else if (ntokens == 3 && strcmp(tokens[SUBCOMMAND_TOKEN].value, "graceful") == 0) {
+ c->close_reason = SHUTDOWN_CLOSE;
conn_set_state(c, conn_closing);
raise(SIGUSR1);
} else {
diff --git a/t/watcher.t b/t/watcher.t
index 43dd7a6..c624fb9 100644
--- a/t/watcher.t
+++ b/t/watcher.t
@@ -5,7 +5,7 @@ use strict;
use warnings;
use Socket qw/SO_RCVBUF/;
-use Test::More tests => 20;
+use Test::More tests => 25;
use FindBin qw($Bin);
use lib "$Bin/lib";
use MemcachedTest;
@@ -72,6 +72,40 @@ if ($res eq "STORED\r\n") {
}
}
+# test connection events
+{
+ # start a dedicated server so that connection close events from previous
+ # tests don't leak into this one due to races.
+ my $conn_server = new_memcached('-m 60 -o watcher_logbuf_size=8');
+ my $conn_watcher = $conn_server->new_sock;
+
+ print $conn_watcher "watch connevents\n";
+ $res = <$conn_watcher>;
+ is($res, "OK\r\n", 'connevents watcher enabled');
+
+ # normal close
+ my $conn_client = $conn_server->new_sock;
+ print $conn_client "version\r\n";
+ $res = <$conn_client>;
+ print $conn_client "quit\r\n";
+ $res = <$conn_watcher>;
+ like($res, qr/ts=\d+\.\d+\ gid=\d+ type=conn_new .+ transport=local/,
+ 'logged new connection');
+ $res = <$conn_watcher>;
+ like($res, qr/ts=\d+\.\d+\ gid=\d+ type=conn_close .+ transport=local reason=normal/,
+ 'logged closed connection due to client disconnect');
+
+ # error close
+ $conn_client = $conn_server->new_sock;
+ print $conn_client "GET / HTTP/1.1\r\n";
+ $res = <$conn_watcher>;
+ like($res, qr/ts=\d+\.\d+\ gid=\d+ type=conn_new .+ transport=local/,
+ 'logged new connection');
+ $res = <$conn_watcher>;
+ like($res, qr/ts=\d+\.\d+\ gid=\d+ type=conn_close .+ transport=local reason=error/,
+ 'logged closed connection due to client protocol error');
+}
+
# test combined logs
# fill to evictions, then enable watcher, set again, and look for both lines