From 617d7cd64d04698b76fee74882627690017e20ad Mon Sep 17 00:00:00 2001 From: Kevin Lin Date: Sat, 29 May 2021 22:19:43 -0700 Subject: Implement LOG_CONNEVENTS watcher flag for connection state transitions Add support for `watch connevents` to report opened (`conn_new`) and closed (`conn_close`) client connections. Event log lines indicate the connection's remote IP, remote port, and transport type. `conn_close` events additionally supply a reason for the closing the connection. --- doc/protocol.txt | 7 +++- logger.c | 100 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ logger.h | 11 ++++++ memcached.c | 17 ++++++++++ memcached.h | 8 +++++ proto_bin.c | 1 + proto_text.c | 5 +++ t/watcher.t | 36 +++++++++++++++++++- 8 files changed, 183 insertions(+), 2 deletions(-) diff --git a/doc/protocol.txt b/doc/protocol.txt index e059d2d..b34690b 100644 --- a/doc/protocol.txt +++ b/doc/protocol.txt @@ -1125,7 +1125,7 @@ Watchers are a way to connect to memcached and inspect what's going on internally. This is an evolving feature so new endpoints should show up over time. -watch +watch - Turn connection into a watcher. Options can be stacked and are space-separated. Logs will be sent to the watcher until it disconnects. @@ -1160,6 +1160,11 @@ The arguments are: cache. Useful in seeing if items being evicted were actually used, and which keys are getting removed. +- "connevents": Emits logs when connections are opened and closed, i.e. when + clients connect or disconnect. For TCP transports, logs indicate the remote + address IP and port. Connection close events additionally supply a reason for + closing the connection. + Statistics ---------- diff --git a/logger.c b/logger.c index 06693c3..16cd22e 100644 --- a/logger.c +++ b/logger.c @@ -1,5 +1,6 @@ /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +#include #include #include #include @@ -56,6 +57,8 @@ static const entry_details default_entries[] = { [LOGGER_SLAB_MOVE] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS, "type=slab_move src=%d dst=%d" }, + [LOGGER_CONNECTION_NEW] = {LOGGER_CONNECTION_NEW_ENTRY, 512, LOG_CONNEVENTS, NULL}, + [LOGGER_CONNECTION_CLOSE] = {LOGGER_CONNECTION_CLOSE_ENTRY, 512, LOG_CONNEVENTS, NULL}, #ifdef EXTSTORE [LOGGER_EXTSTORE_WRITE] = {LOGGER_EXT_WRITE_ENTRY, 512, LOG_EVICTIONS, NULL}, [LOGGER_COMPACT_START] = {LOGGER_TEXT_ENTRY, 512, LOG_SYSEVENTS, @@ -176,6 +179,38 @@ static void logger_set_flags(void) { return; } +/************************* + * Util functions used by the logger background thread + *************************/ + +static int _logger_util_addr_endpoint(struct sockaddr *addr, char *rip, + size_t riplen, unsigned short *rport) { + memset(rip, 0, riplen); + + switch (addr->sa_family) { + case AF_INET: + inet_ntop(AF_INET, &((struct sockaddr_in *) addr)->sin_addr, + rip, riplen - 1); + *rport = ntohs(((struct sockaddr_in *) addr)->sin_port); + break; + case AF_INET6: + inet_ntop(AF_INET6, &((struct sockaddr_in6 *) addr)->sin6_addr, + rip, riplen - 1); + *rport = ntohs(((struct sockaddr_in6 *) addr)->sin6_port); + break; +#ifndef DISABLE_UNIX_SOCKET + // Connections on Unix socket transports have c->request_addr zeroed out. + case AF_UNSPEC: + case AF_UNIX: + strncpy(rip, "unix", strlen("unix") + 1); + *rport = 0; + break; +#endif /* #ifndef DISABLE_UNIX_SOCKET */ + } + + return 0; +} + /************************* * Logger background thread functions. Aggregates per-worker buffers and * writes to any watchers. @@ -235,6 +270,7 @@ static int _logger_thread_parse_ee(logentry *e, char *scratch) { return total; } + #ifdef EXTSTORE static int _logger_thread_parse_extw(logentry *e, char *scratch) { int total; @@ -250,6 +286,43 @@ static int _logger_thread_parse_extw(logentry *e, char *scratch) { return total; } #endif + +static int _logger_thread_parse_cne(logentry *e, char *scratch) { + int total; + unsigned short rport; + char rip[64]; + struct logentry_conn_event *le = (struct logentry_conn_event *) e->data; + const char * const transport_map[] = { "local", "tcp", "udp" }; + + _logger_util_addr_endpoint(&le->addr, rip, sizeof(rip), &rport); + + total = snprintf(scratch, LOGGER_PARSE_SCRATCH, + "ts=%d.%d gid=%llu type=conn_new rip=%s rport=%hu transport=%s cfd=%d\n", + (int) e->tv.tv_sec, (int) e->tv.tv_usec, (unsigned long long) e->gid, + rip, rport, transport_map[le->transport], le->sfd); + + return total; +} + +static int _logger_thread_parse_cce(logentry *e, char *scratch) { + int total; + unsigned short rport; + char rip[64]; + struct logentry_conn_event *le = (struct logentry_conn_event *) e->data; + const char * const transport_map[] = { "local", "tcp", "udp" }; + const char * const reason_map[] = { "error", "normal", "idle_timeout", "shutdown" }; + + _logger_util_addr_endpoint(&le->addr, rip, sizeof(rip), &rport); + + total = snprintf(scratch, LOGGER_PARSE_SCRATCH, + "ts=%d.%d gid=%llu type=conn_close rip=%s rport=%hu transport=%s reason=%s cfd=%d\n", + (int) e->tv.tv_sec, (int) e->tv.tv_usec, (unsigned long long) e->gid, + rip, rport, transport_map[le->transport], + reason_map[le->reason], le->sfd); + + return total; +} + /* Completes rendering of log line. */ static enum logger_parse_entry_ret logger_thread_parse_entry(logentry *e, struct logger_stats *ls, char *scratch, int *scratch_len) { @@ -275,6 +348,12 @@ static enum logger_parse_entry_ret logger_thread_parse_entry(logentry *e, struct case LOGGER_ITEM_STORE_ENTRY: total = _logger_thread_parse_ise(e, scratch); break; + case LOGGER_CONNECTION_NEW_ENTRY: + total = _logger_thread_parse_cne(e, scratch); + break; + case LOGGER_CONNECTION_CLOSE_ENTRY: + total = _logger_thread_parse_cce(e, scratch); + break; } @@ -696,6 +775,17 @@ static void _logger_log_item_store(logentry *e, const enum store_item_type statu e->size = sizeof(struct logentry_item_store) + nkey; } +static void _logger_log_conn_event(logentry *e, struct sockaddr *addr, + enum network_transport transport, enum close_reasons reason, int sfd) { + struct logentry_conn_event *le = (struct logentry_conn_event *) e->data; + + memcpy(&le->addr, addr, sizeof(struct sockaddr)); + le->sfd = sfd; + le->transport = transport; + le->reason = reason; + e->size = sizeof(struct logentry_conn_event); +} + /* Public function for logging an entry. * Tries to encapsulate as much of the formatting as possible to simplify the * caller's code. @@ -777,6 +867,16 @@ enum logger_ret_type logger_log(logger *l, const enum log_entry_type event, cons _logger_log_item_store(e, status, comm, skey, snkey, snbytes, sttl, sclsid, ssfd); va_end(ap); break; + case LOGGER_CONNECTION_NEW_ENTRY: + case LOGGER_CONNECTION_CLOSE_ENTRY: + va_start(ap, entry); + struct sockaddr *addr = va_arg(ap, struct sockaddr *); + enum network_transport transport = va_arg(ap, enum network_transport); + enum close_reasons reason = va_arg(ap, enum close_reasons); + int csfd = va_arg(ap, int); + va_end(ap); + _logger_log_conn_event(e, addr, transport, reason, csfd); + break; } #ifdef NEED_ALIGN diff --git a/logger.h b/logger.h index 715f3af..5492f72 100644 --- a/logger.h +++ b/logger.h @@ -20,6 +20,8 @@ enum log_entry_type { LOGGER_ITEM_STORE, LOGGER_CRAWLER_STATUS, LOGGER_SLAB_MOVE, + LOGGER_CONNECTION_NEW, + LOGGER_CONNECTION_CLOSE, #ifdef EXTSTORE LOGGER_EXTSTORE_WRITE, LOGGER_COMPACT_START, @@ -36,6 +38,8 @@ enum log_entry_subtype { LOGGER_EVICTION_ENTRY, LOGGER_ITEM_GET_ENTRY, LOGGER_ITEM_STORE_ENTRY, + LOGGER_CONNECTION_NEW_ENTRY, + LOGGER_CONNECTION_CLOSE_ENTRY, #ifdef EXTSTORE LOGGER_EXT_WRITE_ENTRY, #endif @@ -101,6 +105,13 @@ struct logentry_item_store { char key[]; }; +struct logentry_conn_event { + int transport; + int reason; + int sfd; + struct sockaddr addr; +}; + /* end intermediary structures */ /* WARNING: cuddled items aren't compatible with warm restart. more code diff --git a/memcached.c b/memcached.c index b8de7e8..60c027f 100644 --- a/memcached.c +++ b/memcached.c @@ -526,6 +526,8 @@ void conn_close_idle(conn *c) { c->thread->stats.idle_kicks++; pthread_mutex_unlock(&c->thread->stats.mutex); + c->close_reason = IDLE_TIMEOUT_CLOSE; + conn_set_state(c, conn_closing); drive_machine(c); } @@ -672,6 +674,11 @@ conn *conn_new(const int sfd, enum conn_states init_state, } } + if (init_state == conn_new_cmd) { + LOGGER_LOG(NULL, LOG_CONNEVENTS, LOGGER_CONNECTION_NEW, NULL, + (struct sockaddr *) &c->request_addr, c->transport, 0, sfd); + } + if (settings.verbose > 1) { if (init_state == conn_listening) { fprintf(stderr, "<%d server listening (%s)\n", sfd, @@ -855,6 +862,12 @@ void conn_free(conn *c) { static void conn_close(conn *c) { assert(c != NULL); + if (c->thread) { + LOGGER_LOG(c->thread->l, LOG_CONNEVENTS, LOGGER_CONNECTION_CLOSE, NULL, + (struct sockaddr *) &c->request_addr, c->transport, + c->close_reason, c->sfd); + } + /* delete the event, the socket and the conn */ event_del(&c->event); @@ -878,6 +891,7 @@ static void conn_close(conn *c) { } #endif close(c->sfd); + c->close_reason = 0; pthread_mutex_lock(&conn_lock); allow_new_conns = true; pthread_mutex_unlock(&conn_lock); @@ -2378,6 +2392,7 @@ static enum try_read_result try_read_network(conn *c) { } } if (res == 0) { + c->close_reason = NORMAL_CLOSE; return READ_ERROR; } if (res == -1) { @@ -3118,6 +3133,7 @@ static void drive_machine(conn *c) { } if (res == 0) { /* end of stream */ + c->close_reason = NORMAL_CLOSE; conn_set_state(c, conn_closing); break; } @@ -3183,6 +3199,7 @@ static void drive_machine(conn *c) { break; } if (res == 0) { /* end of stream */ + c->close_reason = NORMAL_CLOSE; conn_set_state(c, conn_closing); break; } diff --git a/memcached.h b/memcached.h index 8648f25..739c939 100644 --- a/memcached.h +++ b/memcached.h @@ -245,6 +245,13 @@ enum stop_reasons { EXIT_NORMALLY }; +enum close_reasons { + ERROR_CLOSE, + NORMAL_CLOSE, + IDLE_TIMEOUT_CLOSE, + SHUTDOWN_CLOSE, +}; + #define IS_TCP(x) (x == tcp_transport) #define IS_UDP(x) (x == udp_transport) @@ -773,6 +780,7 @@ struct conn { #endif enum protocol protocol; /* which protocol this connection speaks */ enum network_transport transport; /* what transport is used by this connection */ + enum close_reasons close_reason; /* reason for transition into conn_closing */ /* data for UDP clients */ int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */ diff --git a/proto_bin.c b/proto_bin.c index 70b196f..a6bf520 100644 --- a/proto_bin.c +++ b/proto_bin.c @@ -1032,6 +1032,7 @@ static void dispatch_bin_command(conn *c, char *extbuf) { write_bin_response(c, NULL, 0, 0, 0); conn_set_state(c, conn_mwrite); c->close_after_write = true; + c->close_reason = NORMAL_CLOSE; } else { protocol_error = 1; } diff --git a/proto_text.c b/proto_text.c index 22be817..000ddd1 100644 --- a/proto_text.c +++ b/proto_text.c @@ -2269,6 +2269,8 @@ static void process_watch_command(conn *c, token_t *tokens, const size_t ntokens f |= LOG_MUTATIONS; } else if ((strcmp(tokens[x].value, "sysevents") == 0)) { f |= LOG_SYSEVENTS; + } else if ((strcmp(tokens[x].value, "connevents") == 0)) { + f |= LOG_CONNEVENTS; } else { out_string(c, "ERROR"); return; @@ -2485,6 +2487,7 @@ static void process_version_command(conn *c) { static void process_quit_command(conn *c) { conn_set_state(c, conn_mwrite); c->close_after_write = true; + c->close_reason = NORMAL_CLOSE; } static void process_shutdown_command(conn *c, token_t *tokens, const size_t ntokens) { @@ -2494,9 +2497,11 @@ static void process_shutdown_command(conn *c, token_t *tokens, const size_t ntok } if (ntokens == 2) { + c->close_reason = SHUTDOWN_CLOSE; conn_set_state(c, conn_closing); raise(SIGINT); } else if (ntokens == 3 && strcmp(tokens[SUBCOMMAND_TOKEN].value, "graceful") == 0) { + c->close_reason = SHUTDOWN_CLOSE; conn_set_state(c, conn_closing); raise(SIGUSR1); } else { diff --git a/t/watcher.t b/t/watcher.t index 43dd7a6..c624fb9 100644 --- a/t/watcher.t +++ b/t/watcher.t @@ -5,7 +5,7 @@ use strict; use warnings; use Socket qw/SO_RCVBUF/; -use Test::More tests => 20; +use Test::More tests => 25; use FindBin qw($Bin); use lib "$Bin/lib"; use MemcachedTest; @@ -72,6 +72,40 @@ if ($res eq "STORED\r\n") { } } +# test connection events +{ + # start a dedicated server so that connection close events from previous + # tests don't leak into this one due to races. + my $conn_server = new_memcached('-m 60 -o watcher_logbuf_size=8'); + my $conn_watcher = $conn_server->new_sock; + + print $conn_watcher "watch connevents\n"; + $res = <$conn_watcher>; + is($res, "OK\r\n", 'connevents watcher enabled'); + + # normal close + my $conn_client = $conn_server->new_sock; + print $conn_client "version\r\n"; + $res = <$conn_client>; + print $conn_client "quit\r\n"; + $res = <$conn_watcher>; + like($res, qr/ts=\d+\.\d+\ gid=\d+ type=conn_new .+ transport=local/, + 'logged new connection'); + $res = <$conn_watcher>; + like($res, qr/ts=\d+\.\d+\ gid=\d+ type=conn_close .+ transport=local reason=normal/, + 'logged closed connection due to client disconnect'); + + # error close + $conn_client = $conn_server->new_sock; + print $conn_client "GET / HTTP/1.1\r\n"; + $res = <$conn_watcher>; + like($res, qr/ts=\d+\.\d+\ gid=\d+ type=conn_new .+ transport=local/, + 'logged new connection'); + $res = <$conn_watcher>; + like($res, qr/ts=\d+\.\d+\ gid=\d+ type=conn_close .+ transport=local reason=error/, + 'logged closed connection due to client protocol error'); +} + # test combined logs # fill to evictions, then enable watcher, set again, and look for both lines -- cgit v1.2.1