summaryrefslogtreecommitdiff
path: root/proxy_network.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-09-23 12:18:22 -0700
committerdormando <dormando@rydia.net>2022-10-20 14:42:20 -0700
commitcedaf5586883ffd2323307f45f90f4d70e94ecc2 (patch)
treecfbbd3addb6622a1908b22514ef071faa63a92ce /proxy_network.c
parent8d573b00a7c518dde1645e5f15f85e860931b763 (diff)
downloadmemcached-cedaf5586883ffd2323307f45f90f4d70e94ecc2.tar.gz
proxy: backend connection improvement
Improvements to handling of new and failed backend socket connections. Previously connections were initiated immediately, and initially from the config thread, yet completion of opening sockets wouldn't happen until a request tried to use that backend. Now we open connections via the IO thread, as well as validate new connections with a "version\r\n" command. Also fixes a couple of error conditions (parsing, backend disconnect) where clients could hang waiting for a retry time in certain conditions. Now connections should re-establish immediately and dead backends should flip into a bad fast-fail state quicker.
Diffstat (limited to 'proxy_network.c')
-rw-r--r--proxy_network.c257
1 files changed, 230 insertions, 27 deletions
diff --git a/proxy_network.c b/proxy_network.c
index 162c39b..26a0dbf 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -3,11 +3,37 @@
#include "proxy.h"
+enum proxy_be_failures {
+ P_BE_FAIL_TIMEOUT = 0,
+ P_BE_FAIL_DISCONNECTED,
+ P_BE_FAIL_CONNECTING,
+ P_BE_FAIL_READVALIDATE,
+ P_BE_FAIL_BADVALIDATE,
+ P_BE_FAIL_WRITING,
+ P_BE_FAIL_READING,
+ P_BE_FAIL_PARSING,
+};
+
+const char *proxy_be_failure_text[] = {
+ [P_BE_FAIL_TIMEOUT] = "timeout",
+ [P_BE_FAIL_DISCONNECTED] = "disconnected",
+ [P_BE_FAIL_CONNECTING] = "connecting",
+ [P_BE_FAIL_READVALIDATE] = "readvalidate",
+ [P_BE_FAIL_BADVALIDATE] = "badvalidate",
+ [P_BE_FAIL_WRITING] = "writing",
+ [P_BE_FAIL_READING] = "reading",
+ [P_BE_FAIL_PARSING] = "parsing",
+ NULL
+};
+
static void proxy_backend_handler(const int fd, const short which, void *arg);
+static void proxy_beconn_handler(const int fd, const short which, void *arg);
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
+static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg);
static void proxy_event_updater(evutil_socket_t fd, short which, void *arg);
static int _flush_pending_write(mcp_backend_t *be);
static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err);
+static void _backend_failed(mcp_backend_t *be);
static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback);
static int proxy_backend_drive_machine(mcp_backend_t *be);
@@ -163,10 +189,10 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
// should be safe to hold up until their timeout.
_reset_bad_backend(be, P_BE_FAIL_CONNECTING);
_backend_failed_ur(be);
- P_DEBUG("%s: backend failed to connect\n", __func__);
+ P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port);
return;
}
- P_DEBUG("%s: backend connected\n", __func__);
+ P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port);
be->connecting = false;
be->state = mcp_backend_read;
be->bad = false;
@@ -402,6 +428,58 @@ static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) {
STAT_UL(ctx);
}
+// event handler for injecting backends for processing
+// currently just for initiating connections the first time.
+static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
+ proxy_event_thread_t *t = arg;
+
+#ifdef USE_EVENTFD
+ uint64_t u;
+ if (read(fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ // Temporary error or wasn't actually ready to read somehow.
+ return;
+ }
+#else
+ char buf[1];
+ if (read(fd, buf, 1) != 1) {
+ P_DEBUG("%s: pipe read failed\n", __func__);
+ return;
+ }
+#endif
+
+ beconn_head_t head;
+ struct timeval tmp_time = t->tunables.connect;
+
+ STAILQ_INIT(&head);
+ pthread_mutex_lock(&t->mutex);
+ STAILQ_CONCAT(&head, &t->beconn_head_in);
+ pthread_mutex_unlock(&t->mutex);
+
+ // Think we should reuse this code path for manually instructing backends
+ // to disable/etc but not coding for that generically. We just need to
+ // check the state of the backend when it reaches here or some flags at
+ // least.
+ // FIXME: another ->stacked flag?
+ // Either that or remove the STAILQ code and just using an array of
+ // ptr's.
+ mcp_backend_t *be = NULL;
+ STAILQ_FOREACH(be, &head, beconn_next) {
+ be->event_thread = t;
+ int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
+ if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
+ // if we're already connected for some reason, still push it
+ // through the connection handler to keep the code unified. It
+ // will auto-wake because the socket is writeable.
+ be->connecting = true;
+ be->can_write = false;
+ _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
+ } else {
+ _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
+ _backend_failed(be);
+ }
+ }
+}
+
// event handler for executing backend requests
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
proxy_event_thread_t *t = arg;
@@ -442,13 +520,14 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
int flags = 0;
if (be->connecting) {
- P_DEBUG("%s: deferring IO pending connecting\n", __func__);
+ P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port);
} else {
flags = _flush_pending_write(be);
}
if (flags == -1) {
_reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
} else {
flags = be->can_write ? EV_READ|EV_TIMEOUT : EV_READ|EV_WRITE|EV_TIMEOUT;
_set_event(be, t->base, flags, tmp_time, proxy_backend_handler);
@@ -760,12 +839,12 @@ static void proxy_backend_retry_handler(const int fd, const short which, void *a
mcp_backend_t *be = arg;
assert(which & EV_TIMEOUT);
struct timeval tmp_time = be->event_thread->tunables.retry;
- _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler);
+ _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
}
// currently just for timeouts, but certain errors should consider a backend
// to be "bad" as well.
-// must be called before _reset_bad_backend(), so the backend is currently
+// must be called after _reset_bad_backend(), so the backend is currently
// clear.
// TODO (v2): currently only notes for "bad backends" in cases of timeouts or
// connect failures. We need a specific connect() handler that executes a
@@ -785,20 +864,10 @@ static void _backend_failed(mcp_backend_t *be) {
STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1);
} else {
STAT_INCR(be->event_thread->ctx, backend_failed, 1);
- _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler);
+ _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
}
}
-const char *proxy_be_failure_text[] = {
- [P_BE_FAIL_TIMEOUT] = "timeout",
- [P_BE_FAIL_DISCONNECTED] = "disconnected",
- [P_BE_FAIL_CONNECTING] = "connecting",
- [P_BE_FAIL_WRITING] = "writing",
- [P_BE_FAIL_READING] = "reading",
- [P_BE_FAIL_PARSING] = "parsing",
- NULL
-};
-
// TODO (v2): add a second argument for assigning a specific error to all pending
// IO's (ie; timeout).
// The backend has gotten into a bad state (timed out, protocol desync, or
@@ -932,16 +1001,34 @@ static int _flush_pending_write(mcp_backend_t *be) {
return flags;
}
-// The libevent backend callback handler.
-// If we end up resetting a backend, it will get put back into a connecting
-// state.
-static void proxy_backend_handler(const int fd, const short which, void *arg) {
+static int _beconn_send_validate(mcp_backend_t *be) {
+ const char *str = "version\r\n";
+ const ssize_t len = strlen(str);
+
+ ssize_t res = write(mcmc_fd(be->client), str, len);
+
+ if (res == -1) {
+ return -1;
+ }
+
+ // I'm making an opinionated statement that we should be able to write
+ // "version\r\n" into a fresh socket without hitting EAGAIN.
+ if (res < len) {
+ return -1;
+ }
+
+ return 1;
+}
+
+// Libevent handler for backends in a connecting state.
+static void proxy_beconn_handler(const int fd, const short which, void *arg) {
+ assert(arg != NULL);
mcp_backend_t *be = arg;
int flags = EV_TIMEOUT;
struct timeval tmp_time = be->event_thread->tunables.read;
if (which & EV_TIMEOUT) {
- P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
+ P_DEBUG("%s: backend timed out while connecting\n", __func__);
_reset_bad_backend(be, P_BE_FAIL_TIMEOUT);
_backend_failed(be);
return;
@@ -949,9 +1036,7 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
if (which & EV_WRITE) {
be->can_write = true;
- // TODO (v2): move connect routine to its own function?
- // - hard to do right now because we can't (easily?) edit libevent
- // events.
+
if (be->connecting) {
int err = 0;
// We were connecting, now ensure we're properly connected.
@@ -961,20 +1046,127 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
// should be safe to hold up until their timeout.
_reset_bad_backend(be, P_BE_FAIL_CONNECTING);
_backend_failed(be);
- P_DEBUG("%s: backend failed to connect\n", __func__);
+ P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port);
return;
}
- P_DEBUG("%s: backend connected\n", __func__);
+ P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port);
be->connecting = false;
be->state = mcp_backend_read;
be->bad = false;
be->failed_count = 0;
+
+ be->validating = true;
+ // TODO: make validation optional.
+
+ if (_beconn_send_validate(be) == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
+ _backend_failed(be);
+ return;
+ } else {
+ // buffer should be empty during validation stage.
+ assert(be->rbufused == 0);
+ _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler);
+ }
+ }
+
+ // TODO: currently never taken, until validation is made optional.
+ if (!be->validating) {
+ int res = _flush_pending_write(be);
+ if (res == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
+ return;
+ }
+ flags |= res;
}
+ }
+
+ if (which & EV_READ) {
+ assert(be->validating);
+
+ int read = recv(mcmc_fd(be->client), be->rbuf + be->rbufused, READ_BUFFER_SIZE - be->rbufused, 0);
+ if (read > 0) {
+ mcmc_resp_t r;
+ be->rbufused += read;
+
+ int status = mcmc_parse_buf(be->client, be->rbuf, be->rbufused, &r);
+ if (status == MCMC_ERR) {
+ // Needed more data for a version line, somehow. I feel like
+ // this should set off some alarms, but it is possible.
+ if (r.code == MCMC_WANT_READ) {
+ _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler);
+ return;
+ }
+
+ _reset_bad_backend(be, P_BE_FAIL_READVALIDATE);
+ _backend_failed(be);
+ return;
+ }
+
+ if (r.code != MCMC_CODE_VERSION) {
+ _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
+ _backend_failed(be);
+ return;
+ }
+
+ be->validating = false;
+ be->rbufused = 0;
+ } else if (read == 0) {
+ // not connected or error.
+ _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
+ _backend_failed(be);
+ return;
+ } else if (read == -1) {
+ // sit on epoll again.
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ _reset_bad_backend(be, P_BE_FAIL_READING);
+ _backend_failed(be);
+ return;
+ }
+ _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler);
+ }
+
+ // Passed validation, don't need to re-read, flush any pending writes.
int res = _flush_pending_write(be);
if (res == -1) {
_reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
return;
}
+ flags |= res;
+ }
+
+ // Still pending requests to read or write.
+ if (!be->validating && !STAILQ_EMPTY(&be->io_head)) {
+ _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler);
+ }
+
+}
+
+// The libevent backend callback handler.
+// If we end up resetting a backend, it will get put back into a connecting
+// state.
+static void proxy_backend_handler(const int fd, const short which, void *arg) {
+ mcp_backend_t *be = arg;
+ int flags = EV_TIMEOUT;
+ struct timeval tmp_time = be->event_thread->tunables.read;
+
+ if (which & EV_TIMEOUT) {
+ P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
+ _reset_bad_backend(be, P_BE_FAIL_TIMEOUT);
+ _backend_failed(be);
+ return;
+ }
+
+ if (which & EV_WRITE) {
+ be->can_write = true;
+ int res = _flush_pending_write(be);
+ if (res == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
+ return;
+ }
+ flags |= res;
}
if (which & EV_READ) {
@@ -987,16 +1179,19 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
int res = proxy_backend_drive_machine(be);
if (res == -1) {
_reset_bad_backend(be, P_BE_FAIL_PARSING);
+ _backend_failed(be);
return;
}
} else if (read == 0) {
// not connected or error.
_reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
+ _backend_failed(be);
return;
} else if (read == -1) {
// sit on epoll again.
if (errno != EAGAIN && errno != EWOULDBLOCK) {
_reset_bad_backend(be, P_BE_FAIL_READING);
+ _backend_failed(be);
return;
}
}
@@ -1092,9 +1287,13 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) {
#ifdef USE_EVENTFD
event_set(&t->notify_event, t->event_fd,
EV_READ | EV_PERSIST, proxy_event_handler, t);
+ event_set(&t->beconn_event, t->be_event_fd,
+ EV_READ | EV_PERSIST, proxy_event_beconn, t);
#else
event_set(&t->notify_event, t->notify_receive_fd,
EV_READ | EV_PERSIST, proxy_event_handler, t);
+ event_set(&t->beconn_event, t->be_notify_receive_fd,
+ EV_READ | EV_PERSIST, proxy_event_beconn, t);
#endif
evtimer_set(&t->clock_event, proxy_event_updater, t);
@@ -1107,7 +1306,11 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) {
fprintf(stderr, "Can't monitor libevent notify pipe\n");
exit(1);
}
-
+ event_base_set(t->base, &t->beconn_event);
+ if (event_add(&t->beconn_event, 0) == -1) {
+ fprintf(stderr, "Can't monitor libevent notify pipe\n");
+ exit(1);
+ }
}