summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--memcached.c4
-rw-r--r--proto_proxy.c13
-rw-r--r--proxy.h19
-rw-r--r--proxy_lua.c32
-rw-r--r--proxy_network.c257
5 files changed, 273 insertions, 52 deletions
diff --git a/memcached.c b/memcached.c
index 2806afb..c57c0bd 100644
--- a/memcached.c
+++ b/memcached.c
@@ -5675,7 +5675,9 @@ int main (int argc, char **argv) {
protocol_specified = true;
break;
case PROXY_URING:
- settings.proxy_uring = true;
+ fprintf(stderr, "Proxy io-uring mode is not presently supported\n");
+ return 1;
+ //settings.proxy_uring = true;
break;
#endif
#ifdef MEMCACHED_DEBUG
diff --git a/proto_proxy.c b/proto_proxy.c
index d39b7b1..a654b92 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -146,6 +146,11 @@ void *proxy_init(bool use_uring) {
perror("failed to create backend notify eventfd");
exit(1);
}
+ t->be_event_fd = eventfd(0, EFD_NONBLOCK);
+ if (t->be_event_fd == -1) {
+ perror("failed to create backend notify eventfd");
+ exit(1);
+ }
#else
int fds[2];
if (pipe(fds)) {
@@ -155,11 +160,19 @@ void *proxy_init(bool use_uring) {
t->notify_receive_fd = fds[0];
t->notify_send_fd = fds[1];
+
+ if (pipe(fds)) {
+ perror("can't create proxy backend connection notify pipe");
+ exit(1);
+ }
+ t->be_notify_receive_fd = fds[0];
+ t->be_notify_send_fd = fds[1];
#endif
proxy_init_evthread_events(t);
// incoming request queue.
STAILQ_INIT(&t->io_head_in);
+ STAILQ_INIT(&t->beconn_head_in);
pthread_mutex_init(&t->mutex, NULL);
pthread_cond_init(&t->cond, NULL);
diff --git a/proxy.h b/proxy.h
index f034179..f7647d6 100644
--- a/proxy.h
+++ b/proxy.h
@@ -136,15 +136,6 @@ enum proxy_cmd_types {
CMD_TYPE_META, // m*'s.
};
-enum proxy_be_failures {
- P_BE_FAIL_TIMEOUT = 0,
- P_BE_FAIL_DISCONNECTED,
- P_BE_FAIL_CONNECTING,
- P_BE_FAIL_WRITING,
- P_BE_FAIL_READING,
- P_BE_FAIL_PARSING,
-};
-
typedef struct _io_pending_proxy_t io_pending_proxy_t;
typedef struct proxy_event_thread_s proxy_event_thread_t;
@@ -313,6 +304,7 @@ struct mcp_backend_s {
proxy_event_thread_t *event_thread; // event thread owning this backend.
void *client; // mcmc client
STAILQ_ENTRY(mcp_backend_s) be_next; // stack for backends
+ STAILQ_ENTRY(mcp_backend_s) beconn_next; // stack for connecting conns
io_head_t io_head; // stack of requests.
char *rbuf; // statically allocated read buffer.
size_t rbufused; // currently active bytes in the buffer
@@ -325,6 +317,7 @@ struct mcp_backend_s {
enum mcp_backend_states state; // readback state machine
int connect_flags; // flags to pass to mcmc_connect
bool connecting; // in the process of an asynch connection.
+ bool validating; // in process of validating a new backend connection.
bool can_write; // recently got a WANT_WRITE or are connecting.
bool stacked; // if backend already queued for syscalls.
bool bad; // timed out, marked as bad.
@@ -333,12 +326,14 @@ struct mcp_backend_s {
char port[MAX_PORTLEN+1];
};
typedef STAILQ_HEAD(be_head_s, mcp_backend_s) be_head_t;
+typedef STAILQ_HEAD(beconn_head_s, mcp_backend_s) beconn_head_t;
struct proxy_event_thread_s {
pthread_t thread_id;
struct event_base *base;
struct event notify_event; // listen event for the notify pipe/eventfd.
struct event clock_event; // timer for updating event thread data.
+ struct event beconn_event; // listener for backends in connect state
#ifdef HAVE_LIBURING
struct io_uring ring;
proxy_event_t ur_notify_event; // listen on eventfd.
@@ -350,11 +345,15 @@ struct proxy_event_thread_s {
pthread_cond_t cond; // condition to wait on while stack drains.
io_head_t io_head_in; // inbound requests to process.
be_head_t be_head; // stack of backends for processing.
+ beconn_head_t beconn_head_in; // stack of backends for connection processing.
#ifdef USE_EVENTFD
- int event_fd;
+ int event_fd; // for request ingestion
+ int be_event_fd; // for backend ingestion
#else
int notify_receive_fd;
int notify_send_fd;
+ int be_notify_receive_fd;
+ int be_notify_send_fd;
#endif
proxy_ctx_t *ctx; // main context.
struct proxy_tunables tunables; // periodically copied from main ctx
diff --git a/proxy_lua.c b/proxy_lua.c
index 9d2358f..8531f03 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -168,8 +168,6 @@ static int mcplib_backend(lua_State *L) {
proxy_lua_error(L, "out of memory allocating backend");
return 0;
}
- // TODO (v2): connect elsewhere. When there're multiple backend owners, or
- // sockets per backend, etc. We'll want to kick off connects as use time.
// TODO (v2): no way to change the TCP_KEEPALIVE state post-construction.
// This is a trivial fix if we ensure a backend's owning event thread is
// set before it can be used in the proxy, as it would have access to the
@@ -183,19 +181,25 @@ static int mcplib_backend(lua_State *L) {
}
STAT_UL(ctx);
be->connect_flags = flags;
- int status = mcmc_connect(be->client, be->name, be->port, flags);
- if (status == MCMC_CONNECTED) {
- // FIXME (v2): is this possible? do we ever want to allow blocking
- // connections?
- proxy_lua_ferror(L, "unexpectedly connected to backend early: %s:%s\n", be->name, be->port);
- return 0;
- } else if (status == MCMC_CONNECTING) {
- be->connecting = true;
- be->can_write = false;
- } else {
- proxy_lua_ferror(L, "failed to connect to backend: %s:%s\n", be->name, be->port);
- return 0;
+
+ proxy_event_thread_t *e = ctx->proxy_threads;
+ pthread_mutex_lock(&e->mutex);
+ STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next);
+ pthread_mutex_unlock(&e->mutex);
+
+ // Signal to check queue.
+#ifdef USE_EVENTFD
+ uint64_t u = 1;
+ // TODO (v2): check result? is it ever possible to get a short write/failure
+ // for an eventfd?
+ if (write(e->be_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ assert(1 == 0);
+ }
+#else
+ if (write(e->be_notify_send_fd, "w", 1) <= 0) {
+ assert(1 == 0);
}
+#endif
luaL_getmetatable(L, "mcp.backend");
lua_setmetatable(L, -2); // set metatable to userdata.
diff --git a/proxy_network.c b/proxy_network.c
index 162c39b..26a0dbf 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -3,11 +3,37 @@
#include "proxy.h"
+enum proxy_be_failures {
+ P_BE_FAIL_TIMEOUT = 0,
+ P_BE_FAIL_DISCONNECTED,
+ P_BE_FAIL_CONNECTING,
+ P_BE_FAIL_READVALIDATE,
+ P_BE_FAIL_BADVALIDATE,
+ P_BE_FAIL_WRITING,
+ P_BE_FAIL_READING,
+ P_BE_FAIL_PARSING,
+};
+
+const char *proxy_be_failure_text[] = {
+ [P_BE_FAIL_TIMEOUT] = "timeout",
+ [P_BE_FAIL_DISCONNECTED] = "disconnected",
+ [P_BE_FAIL_CONNECTING] = "connecting",
+ [P_BE_FAIL_READVALIDATE] = "readvalidate",
+ [P_BE_FAIL_BADVALIDATE] = "badvalidate",
+ [P_BE_FAIL_WRITING] = "writing",
+ [P_BE_FAIL_READING] = "reading",
+ [P_BE_FAIL_PARSING] = "parsing",
+ NULL
+};
+
static void proxy_backend_handler(const int fd, const short which, void *arg);
+static void proxy_beconn_handler(const int fd, const short which, void *arg);
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
+static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg);
static void proxy_event_updater(evutil_socket_t fd, short which, void *arg);
static int _flush_pending_write(mcp_backend_t *be);
static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err);
+static void _backend_failed(mcp_backend_t *be);
static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback);
static int proxy_backend_drive_machine(mcp_backend_t *be);
@@ -163,10 +189,10 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
// should be safe to hold up until their timeout.
_reset_bad_backend(be, P_BE_FAIL_CONNECTING);
_backend_failed_ur(be);
- P_DEBUG("%s: backend failed to connect\n", __func__);
+ P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port);
return;
}
- P_DEBUG("%s: backend connected\n", __func__);
+ P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port);
be->connecting = false;
be->state = mcp_backend_read;
be->bad = false;
@@ -402,6 +428,58 @@ static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) {
STAT_UL(ctx);
}
+// event handler for injecting backends for processing
+// currently just for initiating connections the first time.
+static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
+ proxy_event_thread_t *t = arg;
+
+#ifdef USE_EVENTFD
+ uint64_t u;
+ if (read(fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ // Temporary error or wasn't actually ready to read somehow.
+ return;
+ }
+#else
+ char buf[1];
+ if (read(fd, buf, 1) != 1) {
+ P_DEBUG("%s: pipe read failed\n", __func__);
+ return;
+ }
+#endif
+
+ beconn_head_t head;
+ struct timeval tmp_time = t->tunables.connect;
+
+ STAILQ_INIT(&head);
+ pthread_mutex_lock(&t->mutex);
+ STAILQ_CONCAT(&head, &t->beconn_head_in);
+ pthread_mutex_unlock(&t->mutex);
+
+ // Think we should reuse this code path for manually instructing backends
+ // to disable/etc but not coding for that generically. We just need to
+ // check the state of the backend when it reaches here or some flags at
+ // least.
+ // FIXME: another ->stacked flag?
+ // Either that or remove the STAILQ code and just using an array of
+ // ptr's.
+ mcp_backend_t *be = NULL;
+ STAILQ_FOREACH(be, &head, beconn_next) {
+ be->event_thread = t;
+ int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
+ if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
+ // if we're already connected for some reason, still push it
+ // through the connection handler to keep the code unified. It
+ // will auto-wake because the socket is writeable.
+ be->connecting = true;
+ be->can_write = false;
+ _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
+ } else {
+ _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
+ _backend_failed(be);
+ }
+ }
+}
+
// event handler for executing backend requests
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
proxy_event_thread_t *t = arg;
@@ -442,13 +520,14 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
int flags = 0;
if (be->connecting) {
- P_DEBUG("%s: deferring IO pending connecting\n", __func__);
+ P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port);
} else {
flags = _flush_pending_write(be);
}
if (flags == -1) {
_reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
} else {
flags = be->can_write ? EV_READ|EV_TIMEOUT : EV_READ|EV_WRITE|EV_TIMEOUT;
_set_event(be, t->base, flags, tmp_time, proxy_backend_handler);
@@ -760,12 +839,12 @@ static void proxy_backend_retry_handler(const int fd, const short which, void *a
mcp_backend_t *be = arg;
assert(which & EV_TIMEOUT);
struct timeval tmp_time = be->event_thread->tunables.retry;
- _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler);
+ _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
}
// currently just for timeouts, but certain errors should consider a backend
// to be "bad" as well.
-// must be called before _reset_bad_backend(), so the backend is currently
+// must be called after _reset_bad_backend(), so the backend is currently
// clear.
// TODO (v2): currently only notes for "bad backends" in cases of timeouts or
// connect failures. We need a specific connect() handler that executes a
@@ -785,20 +864,10 @@ static void _backend_failed(mcp_backend_t *be) {
STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1);
} else {
STAT_INCR(be->event_thread->ctx, backend_failed, 1);
- _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler);
+ _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
}
}
-const char *proxy_be_failure_text[] = {
- [P_BE_FAIL_TIMEOUT] = "timeout",
- [P_BE_FAIL_DISCONNECTED] = "disconnected",
- [P_BE_FAIL_CONNECTING] = "connecting",
- [P_BE_FAIL_WRITING] = "writing",
- [P_BE_FAIL_READING] = "reading",
- [P_BE_FAIL_PARSING] = "parsing",
- NULL
-};
-
// TODO (v2): add a second argument for assigning a specific error to all pending
// IO's (ie; timeout).
// The backend has gotten into a bad state (timed out, protocol desync, or
@@ -932,16 +1001,34 @@ static int _flush_pending_write(mcp_backend_t *be) {
return flags;
}
-// The libevent backend callback handler.
-// If we end up resetting a backend, it will get put back into a connecting
-// state.
-static void proxy_backend_handler(const int fd, const short which, void *arg) {
+static int _beconn_send_validate(mcp_backend_t *be) {
+ const char *str = "version\r\n";
+ const ssize_t len = strlen(str);
+
+ ssize_t res = write(mcmc_fd(be->client), str, len);
+
+ if (res == -1) {
+ return -1;
+ }
+
+ // I'm making an opinionated statement that we should be able to write
+ // "version\r\n" into a fresh socket without hitting EAGAIN.
+ if (res < len) {
+ return -1;
+ }
+
+ return 1;
+}
+
+// Libevent handler for backends in a connecting state.
+static void proxy_beconn_handler(const int fd, const short which, void *arg) {
+ assert(arg != NULL);
mcp_backend_t *be = arg;
int flags = EV_TIMEOUT;
struct timeval tmp_time = be->event_thread->tunables.read;
if (which & EV_TIMEOUT) {
- P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
+ P_DEBUG("%s: backend timed out while connecting\n", __func__);
_reset_bad_backend(be, P_BE_FAIL_TIMEOUT);
_backend_failed(be);
return;
@@ -949,9 +1036,7 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
if (which & EV_WRITE) {
be->can_write = true;
- // TODO (v2): move connect routine to its own function?
- // - hard to do right now because we can't (easily?) edit libevent
- // events.
+
if (be->connecting) {
int err = 0;
// We were connecting, now ensure we're properly connected.
@@ -961,20 +1046,127 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
// should be safe to hold up until their timeout.
_reset_bad_backend(be, P_BE_FAIL_CONNECTING);
_backend_failed(be);
- P_DEBUG("%s: backend failed to connect\n", __func__);
+ P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port);
return;
}
- P_DEBUG("%s: backend connected\n", __func__);
+ P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port);
be->connecting = false;
be->state = mcp_backend_read;
be->bad = false;
be->failed_count = 0;
+
+ be->validating = true;
+ // TODO: make validation optional.
+
+ if (_beconn_send_validate(be) == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
+ _backend_failed(be);
+ return;
+ } else {
+ // buffer should be empty during validation stage.
+ assert(be->rbufused == 0);
+ _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler);
+ }
+ }
+
+ // TODO: currently never taken, until validation is made optional.
+ if (!be->validating) {
+ int res = _flush_pending_write(be);
+ if (res == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
+ return;
+ }
+ flags |= res;
}
+ }
+
+ if (which & EV_READ) {
+ assert(be->validating);
+
+ int read = recv(mcmc_fd(be->client), be->rbuf + be->rbufused, READ_BUFFER_SIZE - be->rbufused, 0);
+ if (read > 0) {
+ mcmc_resp_t r;
+ be->rbufused += read;
+
+ int status = mcmc_parse_buf(be->client, be->rbuf, be->rbufused, &r);
+ if (status == MCMC_ERR) {
+ // Needed more data for a version line, somehow. I feel like
+ // this should set off some alarms, but it is possible.
+ if (r.code == MCMC_WANT_READ) {
+ _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler);
+ return;
+ }
+
+ _reset_bad_backend(be, P_BE_FAIL_READVALIDATE);
+ _backend_failed(be);
+ return;
+ }
+
+ if (r.code != MCMC_CODE_VERSION) {
+ _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
+ _backend_failed(be);
+ return;
+ }
+
+ be->validating = false;
+ be->rbufused = 0;
+ } else if (read == 0) {
+ // not connected or error.
+ _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
+ _backend_failed(be);
+ return;
+ } else if (read == -1) {
+ // sit on epoll again.
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ _reset_bad_backend(be, P_BE_FAIL_READING);
+ _backend_failed(be);
+ return;
+ }
+ _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler);
+ }
+
+ // Passed validation, don't need to re-read, flush any pending writes.
int res = _flush_pending_write(be);
if (res == -1) {
_reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
return;
}
+ flags |= res;
+ }
+
+ // Still pending requests to read or write.
+ if (!be->validating && !STAILQ_EMPTY(&be->io_head)) {
+ _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler);
+ }
+
+}
+
+// The libevent backend callback handler.
+// If we end up resetting a backend, it will get put back into a connecting
+// state.
+static void proxy_backend_handler(const int fd, const short which, void *arg) {
+ mcp_backend_t *be = arg;
+ int flags = EV_TIMEOUT;
+ struct timeval tmp_time = be->event_thread->tunables.read;
+
+ if (which & EV_TIMEOUT) {
+ P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
+ _reset_bad_backend(be, P_BE_FAIL_TIMEOUT);
+ _backend_failed(be);
+ return;
+ }
+
+ if (which & EV_WRITE) {
+ be->can_write = true;
+ int res = _flush_pending_write(be);
+ if (res == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
+ return;
+ }
+ flags |= res;
}
if (which & EV_READ) {
@@ -987,16 +1179,19 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
int res = proxy_backend_drive_machine(be);
if (res == -1) {
_reset_bad_backend(be, P_BE_FAIL_PARSING);
+ _backend_failed(be);
return;
}
} else if (read == 0) {
// not connected or error.
_reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
+ _backend_failed(be);
return;
} else if (read == -1) {
// sit on epoll again.
if (errno != EAGAIN && errno != EWOULDBLOCK) {
_reset_bad_backend(be, P_BE_FAIL_READING);
+ _backend_failed(be);
return;
}
}
@@ -1092,9 +1287,13 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) {
#ifdef USE_EVENTFD
event_set(&t->notify_event, t->event_fd,
EV_READ | EV_PERSIST, proxy_event_handler, t);
+ event_set(&t->beconn_event, t->be_event_fd,
+ EV_READ | EV_PERSIST, proxy_event_beconn, t);
#else
event_set(&t->notify_event, t->notify_receive_fd,
EV_READ | EV_PERSIST, proxy_event_handler, t);
+ event_set(&t->beconn_event, t->be_notify_receive_fd,
+ EV_READ | EV_PERSIST, proxy_event_beconn, t);
#endif
evtimer_set(&t->clock_event, proxy_event_updater, t);
@@ -1107,7 +1306,11 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) {
fprintf(stderr, "Can't monitor libevent notify pipe\n");
exit(1);
}
-
+ event_base_set(t->base, &t->beconn_event);
+ if (event_add(&t->beconn_event, 0) == -1) {
+ fprintf(stderr, "Can't monitor libevent notify pipe\n");
+ exit(1);
+ }
}