summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-09-23 12:18:22 -0700
committerdormando <dormando@rydia.net>2022-10-20 14:42:20 -0700
commitcedaf5586883ffd2323307f45f90f4d70e94ecc2 (patch)
treecfbbd3addb6622a1908b22514ef071faa63a92ce
parent8d573b00a7c518dde1645e5f15f85e860931b763 (diff)
downloadmemcached-cedaf5586883ffd2323307f45f90f4d70e94ecc2.tar.gz
proxy: backend connection improvement
Improvements to handling of new and failed backend socket connections. Previously connections were initiated immediately, and initially from the config thread, yet completion of opening sockets wouldn't happen until a request tried to use that backend. Now we open connections via the IO thread, as well as validate new connections with a "version\r\n" command. Also fixes a couple of error conditions (parsing, backend disconnect) where clients could hang waiting for a retry time in certain conditions. Now connections should re-establish immediately and dead backends should flip into a bad fast-fail state quicker.
-rw-r--r--memcached.c4
-rw-r--r--proto_proxy.c13
-rw-r--r--proxy.h19
-rw-r--r--proxy_lua.c32
-rw-r--r--proxy_network.c257
5 files changed, 273 insertions, 52 deletions
diff --git a/memcached.c b/memcached.c
index 2806afb..c57c0bd 100644
--- a/memcached.c
+++ b/memcached.c
@@ -5675,7 +5675,9 @@ int main (int argc, char **argv) {
protocol_specified = true;
break;
case PROXY_URING:
- settings.proxy_uring = true;
+ fprintf(stderr, "Proxy io-uring mode is not presently supported\n");
+ return 1;
+ //settings.proxy_uring = true;
break;
#endif
#ifdef MEMCACHED_DEBUG
diff --git a/proto_proxy.c b/proto_proxy.c
index d39b7b1..a654b92 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -146,6 +146,11 @@ void *proxy_init(bool use_uring) {
perror("failed to create backend notify eventfd");
exit(1);
}
+ t->be_event_fd = eventfd(0, EFD_NONBLOCK);
+ if (t->be_event_fd == -1) {
+ perror("failed to create backend notify eventfd");
+ exit(1);
+ }
#else
int fds[2];
if (pipe(fds)) {
@@ -155,11 +160,19 @@ void *proxy_init(bool use_uring) {
t->notify_receive_fd = fds[0];
t->notify_send_fd = fds[1];
+
+ if (pipe(fds)) {
+ perror("can't create proxy backend connection notify pipe");
+ exit(1);
+ }
+ t->be_notify_receive_fd = fds[0];
+ t->be_notify_send_fd = fds[1];
#endif
proxy_init_evthread_events(t);
// incoming request queue.
STAILQ_INIT(&t->io_head_in);
+ STAILQ_INIT(&t->beconn_head_in);
pthread_mutex_init(&t->mutex, NULL);
pthread_cond_init(&t->cond, NULL);
diff --git a/proxy.h b/proxy.h
index f034179..f7647d6 100644
--- a/proxy.h
+++ b/proxy.h
@@ -136,15 +136,6 @@ enum proxy_cmd_types {
CMD_TYPE_META, // m*'s.
};
-enum proxy_be_failures {
- P_BE_FAIL_TIMEOUT = 0,
- P_BE_FAIL_DISCONNECTED,
- P_BE_FAIL_CONNECTING,
- P_BE_FAIL_WRITING,
- P_BE_FAIL_READING,
- P_BE_FAIL_PARSING,
-};
-
typedef struct _io_pending_proxy_t io_pending_proxy_t;
typedef struct proxy_event_thread_s proxy_event_thread_t;
@@ -313,6 +304,7 @@ struct mcp_backend_s {
proxy_event_thread_t *event_thread; // event thread owning this backend.
void *client; // mcmc client
STAILQ_ENTRY(mcp_backend_s) be_next; // stack for backends
+ STAILQ_ENTRY(mcp_backend_s) beconn_next; // stack for connecting conns
io_head_t io_head; // stack of requests.
char *rbuf; // statically allocated read buffer.
size_t rbufused; // currently active bytes in the buffer
@@ -325,6 +317,7 @@ struct mcp_backend_s {
enum mcp_backend_states state; // readback state machine
int connect_flags; // flags to pass to mcmc_connect
bool connecting; // in the process of an asynch connection.
+ bool validating; // in process of validating a new backend connection.
bool can_write; // recently got a WANT_WRITE or are connecting.
bool stacked; // if backend already queued for syscalls.
bool bad; // timed out, marked as bad.
@@ -333,12 +326,14 @@ struct mcp_backend_s {
char port[MAX_PORTLEN+1];
};
typedef STAILQ_HEAD(be_head_s, mcp_backend_s) be_head_t;
+typedef STAILQ_HEAD(beconn_head_s, mcp_backend_s) beconn_head_t;
struct proxy_event_thread_s {
pthread_t thread_id;
struct event_base *base;
struct event notify_event; // listen event for the notify pipe/eventfd.
struct event clock_event; // timer for updating event thread data.
+ struct event beconn_event; // listener for backends in connect state
#ifdef HAVE_LIBURING
struct io_uring ring;
proxy_event_t ur_notify_event; // listen on eventfd.
@@ -350,11 +345,15 @@ struct proxy_event_thread_s {
pthread_cond_t cond; // condition to wait on while stack drains.
io_head_t io_head_in; // inbound requests to process.
be_head_t be_head; // stack of backends for processing.
+ beconn_head_t beconn_head_in; // stack of backends for connection processing.
#ifdef USE_EVENTFD
- int event_fd;
+ int event_fd; // for request ingestion
+ int be_event_fd; // for backend ingestion
#else
int notify_receive_fd;
int notify_send_fd;
+ int be_notify_receive_fd;
+ int be_notify_send_fd;
#endif
proxy_ctx_t *ctx; // main context.
struct proxy_tunables tunables; // periodically copied from main ctx
diff --git a/proxy_lua.c b/proxy_lua.c
index 9d2358f..8531f03 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -168,8 +168,6 @@ static int mcplib_backend(lua_State *L) {
proxy_lua_error(L, "out of memory allocating backend");
return 0;
}
- // TODO (v2): connect elsewhere. When there're multiple backend owners, or
- // sockets per backend, etc. We'll want to kick off connects as use time.
// TODO (v2): no way to change the TCP_KEEPALIVE state post-construction.
// This is a trivial fix if we ensure a backend's owning event thread is
// set before it can be used in the proxy, as it would have access to the
@@ -183,19 +181,25 @@ static int mcplib_backend(lua_State *L) {
}
STAT_UL(ctx);
be->connect_flags = flags;
- int status = mcmc_connect(be->client, be->name, be->port, flags);
- if (status == MCMC_CONNECTED) {
- // FIXME (v2): is this possible? do we ever want to allow blocking
- // connections?
- proxy_lua_ferror(L, "unexpectedly connected to backend early: %s:%s\n", be->name, be->port);
- return 0;
- } else if (status == MCMC_CONNECTING) {
- be->connecting = true;
- be->can_write = false;
- } else {
- proxy_lua_ferror(L, "failed to connect to backend: %s:%s\n", be->name, be->port);
- return 0;
+
+ proxy_event_thread_t *e = ctx->proxy_threads;
+ pthread_mutex_lock(&e->mutex);
+ STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next);
+ pthread_mutex_unlock(&e->mutex);
+
+ // Signal to check queue.
+#ifdef USE_EVENTFD
+ uint64_t u = 1;
+ // TODO (v2): check result? is it ever possible to get a short write/failure
+ // for an eventfd?
+ if (write(e->be_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ assert(1 == 0);
+ }
+#else
+ if (write(e->be_notify_send_fd, "w", 1) <= 0) {
+ assert(1 == 0);
}
+#endif
luaL_getmetatable(L, "mcp.backend");
lua_setmetatable(L, -2); // set metatable to userdata.
diff --git a/proxy_network.c b/proxy_network.c
index 162c39b..26a0dbf 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -3,11 +3,37 @@
#include "proxy.h"
+enum proxy_be_failures {
+ P_BE_FAIL_TIMEOUT = 0,
+ P_BE_FAIL_DISCONNECTED,
+ P_BE_FAIL_CONNECTING,
+ P_BE_FAIL_READVALIDATE,
+ P_BE_FAIL_BADVALIDATE,
+ P_BE_FAIL_WRITING,
+ P_BE_FAIL_READING,
+ P_BE_FAIL_PARSING,
+};
+
+const char *proxy_be_failure_text[] = {
+ [P_BE_FAIL_TIMEOUT] = "timeout",
+ [P_BE_FAIL_DISCONNECTED] = "disconnected",
+ [P_BE_FAIL_CONNECTING] = "connecting",
+ [P_BE_FAIL_READVALIDATE] = "readvalidate",
+ [P_BE_FAIL_BADVALIDATE] = "badvalidate",
+ [P_BE_FAIL_WRITING] = "writing",
+ [P_BE_FAIL_READING] = "reading",
+ [P_BE_FAIL_PARSING] = "parsing",
+ NULL
+};
+
static void proxy_backend_handler(const int fd, const short which, void *arg);
+static void proxy_beconn_handler(const int fd, const short which, void *arg);
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
+static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg);
static void proxy_event_updater(evutil_socket_t fd, short which, void *arg);
static int _flush_pending_write(mcp_backend_t *be);
static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err);
+static void _backend_failed(mcp_backend_t *be);
static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback);
static int proxy_backend_drive_machine(mcp_backend_t *be);
@@ -163,10 +189,10 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
// should be safe to hold up until their timeout.
_reset_bad_backend(be, P_BE_FAIL_CONNECTING);
_backend_failed_ur(be);
- P_DEBUG("%s: backend failed to connect\n", __func__);
+ P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port);
return;
}
- P_DEBUG("%s: backend connected\n", __func__);
+ P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port);
be->connecting = false;
be->state = mcp_backend_read;
be->bad = false;
@@ -402,6 +428,58 @@ static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) {
STAT_UL(ctx);
}
+// event handler for injecting backends for processing
+// currently just for initiating connections the first time.
+static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
+ proxy_event_thread_t *t = arg;
+
+#ifdef USE_EVENTFD
+ uint64_t u;
+ if (read(fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ // Temporary error or wasn't actually ready to read somehow.
+ return;
+ }
+#else
+ char buf[1];
+ if (read(fd, buf, 1) != 1) {
+ P_DEBUG("%s: pipe read failed\n", __func__);
+ return;
+ }
+#endif
+
+ beconn_head_t head;
+ struct timeval tmp_time = t->tunables.connect;
+
+ STAILQ_INIT(&head);
+ pthread_mutex_lock(&t->mutex);
+ STAILQ_CONCAT(&head, &t->beconn_head_in);
+ pthread_mutex_unlock(&t->mutex);
+
+ // Think we should reuse this code path for manually instructing backends
+ // to disable/etc but not coding for that generically. We just need to
+ // check the state of the backend when it reaches here or some flags at
+ // least.
+ // FIXME: another ->stacked flag?
+ // Either that or remove the STAILQ code and just using an array of
+ // ptr's.
+ mcp_backend_t *be = NULL;
+ STAILQ_FOREACH(be, &head, beconn_next) {
+ be->event_thread = t;
+ int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
+ if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
+ // if we're already connected for some reason, still push it
+ // through the connection handler to keep the code unified. It
+ // will auto-wake because the socket is writeable.
+ be->connecting = true;
+ be->can_write = false;
+ _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
+ } else {
+ _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
+ _backend_failed(be);
+ }
+ }
+}
+
// event handler for executing backend requests
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
proxy_event_thread_t *t = arg;
@@ -442,13 +520,14 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
int flags = 0;
if (be->connecting) {
- P_DEBUG("%s: deferring IO pending connecting\n", __func__);
+ P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port);
} else {
flags = _flush_pending_write(be);
}
if (flags == -1) {
_reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
} else {
flags = be->can_write ? EV_READ|EV_TIMEOUT : EV_READ|EV_WRITE|EV_TIMEOUT;
_set_event(be, t->base, flags, tmp_time, proxy_backend_handler);
@@ -760,12 +839,12 @@ static void proxy_backend_retry_handler(const int fd, const short which, void *a
mcp_backend_t *be = arg;
assert(which & EV_TIMEOUT);
struct timeval tmp_time = be->event_thread->tunables.retry;
- _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler);
+ _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
}
// currently just for timeouts, but certain errors should consider a backend
// to be "bad" as well.
-// must be called before _reset_bad_backend(), so the backend is currently
+// must be called after _reset_bad_backend(), so the backend is currently
// clear.
// TODO (v2): currently only notes for "bad backends" in cases of timeouts or
// connect failures. We need a specific connect() handler that executes a
@@ -785,20 +864,10 @@ static void _backend_failed(mcp_backend_t *be) {
STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1);
} else {
STAT_INCR(be->event_thread->ctx, backend_failed, 1);
- _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler);
+ _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
}
}
-const char *proxy_be_failure_text[] = {
- [P_BE_FAIL_TIMEOUT] = "timeout",
- [P_BE_FAIL_DISCONNECTED] = "disconnected",
- [P_BE_FAIL_CONNECTING] = "connecting",
- [P_BE_FAIL_WRITING] = "writing",
- [P_BE_FAIL_READING] = "reading",
- [P_BE_FAIL_PARSING] = "parsing",
- NULL
-};
-
// TODO (v2): add a second argument for assigning a specific error to all pending
// IO's (ie; timeout).
// The backend has gotten into a bad state (timed out, protocol desync, or
@@ -932,16 +1001,34 @@ static int _flush_pending_write(mcp_backend_t *be) {
return flags;
}
-// The libevent backend callback handler.
-// If we end up resetting a backend, it will get put back into a connecting
-// state.
-static void proxy_backend_handler(const int fd, const short which, void *arg) {
+static int _beconn_send_validate(mcp_backend_t *be) {
+ const char *str = "version\r\n";
+ const ssize_t len = strlen(str);
+
+ ssize_t res = write(mcmc_fd(be->client), str, len);
+
+ if (res == -1) {
+ return -1;
+ }
+
+ // I'm making an opinionated statement that we should be able to write
+ // "version\r\n" into a fresh socket without hitting EAGAIN.
+ if (res < len) {
+ return -1;
+ }
+
+ return 1;
+}
+
+// Libevent handler for backends in a connecting state.
+static void proxy_beconn_handler(const int fd, const short which, void *arg) {
+ assert(arg != NULL);
mcp_backend_t *be = arg;
int flags = EV_TIMEOUT;
struct timeval tmp_time = be->event_thread->tunables.read;
if (which & EV_TIMEOUT) {
- P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
+ P_DEBUG("%s: backend timed out while connecting\n", __func__);
_reset_bad_backend(be, P_BE_FAIL_TIMEOUT);
_backend_failed(be);
return;
@@ -949,9 +1036,7 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
if (which & EV_WRITE) {
be->can_write = true;
- // TODO (v2): move connect routine to its own function?
- // - hard to do right now because we can't (easily?) edit libevent
- // events.
+
if (be->connecting) {
int err = 0;
// We were connecting, now ensure we're properly connected.
@@ -961,20 +1046,127 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
// should be safe to hold up until their timeout.
_reset_bad_backend(be, P_BE_FAIL_CONNECTING);
_backend_failed(be);
- P_DEBUG("%s: backend failed to connect\n", __func__);
+ P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port);
return;
}
- P_DEBUG("%s: backend connected\n", __func__);
+ P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port);
be->connecting = false;
be->state = mcp_backend_read;
be->bad = false;
be->failed_count = 0;
+
+ be->validating = true;
+ // TODO: make validation optional.
+
+ if (_beconn_send_validate(be) == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
+ _backend_failed(be);
+ return;
+ } else {
+ // buffer should be empty during validation stage.
+ assert(be->rbufused == 0);
+ _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler);
+ }
+ }
+
+ // TODO: currently never taken, until validation is made optional.
+ if (!be->validating) {
+ int res = _flush_pending_write(be);
+ if (res == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
+ return;
+ }
+ flags |= res;
}
+ }
+
+ if (which & EV_READ) {
+ assert(be->validating);
+
+ int read = recv(mcmc_fd(be->client), be->rbuf + be->rbufused, READ_BUFFER_SIZE - be->rbufused, 0);
+ if (read > 0) {
+ mcmc_resp_t r;
+ be->rbufused += read;
+
+ int status = mcmc_parse_buf(be->client, be->rbuf, be->rbufused, &r);
+ if (status == MCMC_ERR) {
+ // Needed more data for a version line, somehow. I feel like
+ // this should set off some alarms, but it is possible.
+ if (r.code == MCMC_WANT_READ) {
+ _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler);
+ return;
+ }
+
+ _reset_bad_backend(be, P_BE_FAIL_READVALIDATE);
+ _backend_failed(be);
+ return;
+ }
+
+ if (r.code != MCMC_CODE_VERSION) {
+ _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
+ _backend_failed(be);
+ return;
+ }
+
+ be->validating = false;
+ be->rbufused = 0;
+ } else if (read == 0) {
+ // not connected or error.
+ _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
+ _backend_failed(be);
+ return;
+ } else if (read == -1) {
+ // sit on epoll again.
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ _reset_bad_backend(be, P_BE_FAIL_READING);
+ _backend_failed(be);
+ return;
+ }
+ _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler);
+ }
+
+ // Passed validation, don't need to re-read, flush any pending writes.
int res = _flush_pending_write(be);
if (res == -1) {
_reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
return;
}
+ flags |= res;
+ }
+
+ // Still pending requests to read or write.
+ if (!be->validating && !STAILQ_EMPTY(&be->io_head)) {
+ _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler);
+ }
+
+}
+
+// The libevent backend callback handler.
+// If we end up resetting a backend, it will get put back into a connecting
+// state.
+static void proxy_backend_handler(const int fd, const short which, void *arg) {
+ mcp_backend_t *be = arg;
+ int flags = EV_TIMEOUT;
+ struct timeval tmp_time = be->event_thread->tunables.read;
+
+ if (which & EV_TIMEOUT) {
+ P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
+ _reset_bad_backend(be, P_BE_FAIL_TIMEOUT);
+ _backend_failed(be);
+ return;
+ }
+
+ if (which & EV_WRITE) {
+ be->can_write = true;
+ int res = _flush_pending_write(be);
+ if (res == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
+ return;
+ }
+ flags |= res;
}
if (which & EV_READ) {
@@ -987,16 +1179,19 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
int res = proxy_backend_drive_machine(be);
if (res == -1) {
_reset_bad_backend(be, P_BE_FAIL_PARSING);
+ _backend_failed(be);
return;
}
} else if (read == 0) {
// not connected or error.
_reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
+ _backend_failed(be);
return;
} else if (read == -1) {
// sit on epoll again.
if (errno != EAGAIN && errno != EWOULDBLOCK) {
_reset_bad_backend(be, P_BE_FAIL_READING);
+ _backend_failed(be);
return;
}
}
@@ -1092,9 +1287,13 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) {
#ifdef USE_EVENTFD
event_set(&t->notify_event, t->event_fd,
EV_READ | EV_PERSIST, proxy_event_handler, t);
+ event_set(&t->beconn_event, t->be_event_fd,
+ EV_READ | EV_PERSIST, proxy_event_beconn, t);
#else
event_set(&t->notify_event, t->notify_receive_fd,
EV_READ | EV_PERSIST, proxy_event_handler, t);
+ event_set(&t->beconn_event, t->be_notify_receive_fd,
+ EV_READ | EV_PERSIST, proxy_event_beconn, t);
#endif
evtimer_set(&t->clock_event, proxy_event_updater, t);
@@ -1107,7 +1306,11 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) {
fprintf(stderr, "Can't monitor libevent notify pipe\n");
exit(1);
}
-
+ event_base_set(t->base, &t->beconn_event);
+ if (event_add(&t->beconn_event, 0) == -1) {
+ fprintf(stderr, "Can't monitor libevent notify pipe\n");
+ exit(1);
+ }
}