diff options
-rw-r--r-- | memcached.c | 4 | ||||
-rw-r--r-- | proto_proxy.c | 13 | ||||
-rw-r--r-- | proxy.h | 19 | ||||
-rw-r--r-- | proxy_lua.c | 32 | ||||
-rw-r--r-- | proxy_network.c | 257 |
5 files changed, 273 insertions, 52 deletions
diff --git a/memcached.c b/memcached.c index 2806afb..c57c0bd 100644 --- a/memcached.c +++ b/memcached.c @@ -5675,7 +5675,9 @@ int main (int argc, char **argv) { protocol_specified = true; break; case PROXY_URING: - settings.proxy_uring = true; + fprintf(stderr, "Proxy io-uring mode is not presently supported\n"); + return 1; + //settings.proxy_uring = true; break; #endif #ifdef MEMCACHED_DEBUG diff --git a/proto_proxy.c b/proto_proxy.c index d39b7b1..a654b92 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -146,6 +146,11 @@ void *proxy_init(bool use_uring) { perror("failed to create backend notify eventfd"); exit(1); } + t->be_event_fd = eventfd(0, EFD_NONBLOCK); + if (t->be_event_fd == -1) { + perror("failed to create backend notify eventfd"); + exit(1); + } #else int fds[2]; if (pipe(fds)) { @@ -155,11 +160,19 @@ void *proxy_init(bool use_uring) { t->notify_receive_fd = fds[0]; t->notify_send_fd = fds[1]; + + if (pipe(fds)) { + perror("can't create proxy backend connection notify pipe"); + exit(1); + } + t->be_notify_receive_fd = fds[0]; + t->be_notify_send_fd = fds[1]; #endif proxy_init_evthread_events(t); // incoming request queue. STAILQ_INIT(&t->io_head_in); + STAILQ_INIT(&t->beconn_head_in); pthread_mutex_init(&t->mutex, NULL); pthread_cond_init(&t->cond, NULL); @@ -136,15 +136,6 @@ enum proxy_cmd_types { CMD_TYPE_META, // m*'s. }; -enum proxy_be_failures { - P_BE_FAIL_TIMEOUT = 0, - P_BE_FAIL_DISCONNECTED, - P_BE_FAIL_CONNECTING, - P_BE_FAIL_WRITING, - P_BE_FAIL_READING, - P_BE_FAIL_PARSING, -}; - typedef struct _io_pending_proxy_t io_pending_proxy_t; typedef struct proxy_event_thread_s proxy_event_thread_t; @@ -313,6 +304,7 @@ struct mcp_backend_s { proxy_event_thread_t *event_thread; // event thread owning this backend. void *client; // mcmc client STAILQ_ENTRY(mcp_backend_s) be_next; // stack for backends + STAILQ_ENTRY(mcp_backend_s) beconn_next; // stack for connecting conns io_head_t io_head; // stack of requests. char *rbuf; // statically allocated read buffer. size_t rbufused; // currently active bytes in the buffer @@ -325,6 +317,7 @@ struct mcp_backend_s { enum mcp_backend_states state; // readback state machine int connect_flags; // flags to pass to mcmc_connect bool connecting; // in the process of an asynch connection. + bool validating; // in process of validating a new backend connection. bool can_write; // recently got a WANT_WRITE or are connecting. bool stacked; // if backend already queued for syscalls. bool bad; // timed out, marked as bad. @@ -333,12 +326,14 @@ struct mcp_backend_s { char port[MAX_PORTLEN+1]; }; typedef STAILQ_HEAD(be_head_s, mcp_backend_s) be_head_t; +typedef STAILQ_HEAD(beconn_head_s, mcp_backend_s) beconn_head_t; struct proxy_event_thread_s { pthread_t thread_id; struct event_base *base; struct event notify_event; // listen event for the notify pipe/eventfd. struct event clock_event; // timer for updating event thread data. + struct event beconn_event; // listener for backends in connect state #ifdef HAVE_LIBURING struct io_uring ring; proxy_event_t ur_notify_event; // listen on eventfd. @@ -350,11 +345,15 @@ struct proxy_event_thread_s { pthread_cond_t cond; // condition to wait on while stack drains. io_head_t io_head_in; // inbound requests to process. be_head_t be_head; // stack of backends for processing. + beconn_head_t beconn_head_in; // stack of backends for connection processing. #ifdef USE_EVENTFD - int event_fd; + int event_fd; // for request ingestion + int be_event_fd; // for backend ingestion #else int notify_receive_fd; int notify_send_fd; + int be_notify_receive_fd; + int be_notify_send_fd; #endif proxy_ctx_t *ctx; // main context. struct proxy_tunables tunables; // periodically copied from main ctx diff --git a/proxy_lua.c b/proxy_lua.c index 9d2358f..8531f03 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -168,8 +168,6 @@ static int mcplib_backend(lua_State *L) { proxy_lua_error(L, "out of memory allocating backend"); return 0; } - // TODO (v2): connect elsewhere. When there're multiple backend owners, or - // sockets per backend, etc. We'll want to kick off connects as use time. // TODO (v2): no way to change the TCP_KEEPALIVE state post-construction. // This is a trivial fix if we ensure a backend's owning event thread is // set before it can be used in the proxy, as it would have access to the @@ -183,19 +181,25 @@ static int mcplib_backend(lua_State *L) { } STAT_UL(ctx); be->connect_flags = flags; - int status = mcmc_connect(be->client, be->name, be->port, flags); - if (status == MCMC_CONNECTED) { - // FIXME (v2): is this possible? do we ever want to allow blocking - // connections? - proxy_lua_ferror(L, "unexpectedly connected to backend early: %s:%s\n", be->name, be->port); - return 0; - } else if (status == MCMC_CONNECTING) { - be->connecting = true; - be->can_write = false; - } else { - proxy_lua_ferror(L, "failed to connect to backend: %s:%s\n", be->name, be->port); - return 0; + + proxy_event_thread_t *e = ctx->proxy_threads; + pthread_mutex_lock(&e->mutex); + STAILQ_INSERT_TAIL(&e->beconn_head_in, be, beconn_next); + pthread_mutex_unlock(&e->mutex); + + // Signal to check queue. +#ifdef USE_EVENTFD + uint64_t u = 1; + // TODO (v2): check result? is it ever possible to get a short write/failure + // for an eventfd? + if (write(e->be_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) { + assert(1 == 0); + } +#else + if (write(e->be_notify_send_fd, "w", 1) <= 0) { + assert(1 == 0); } +#endif luaL_getmetatable(L, "mcp.backend"); lua_setmetatable(L, -2); // set metatable to userdata. diff --git a/proxy_network.c b/proxy_network.c index 162c39b..26a0dbf 100644 --- a/proxy_network.c +++ b/proxy_network.c @@ -3,11 +3,37 @@ #include "proxy.h" +enum proxy_be_failures { + P_BE_FAIL_TIMEOUT = 0, + P_BE_FAIL_DISCONNECTED, + P_BE_FAIL_CONNECTING, + P_BE_FAIL_READVALIDATE, + P_BE_FAIL_BADVALIDATE, + P_BE_FAIL_WRITING, + P_BE_FAIL_READING, + P_BE_FAIL_PARSING, +}; + +const char *proxy_be_failure_text[] = { + [P_BE_FAIL_TIMEOUT] = "timeout", + [P_BE_FAIL_DISCONNECTED] = "disconnected", + [P_BE_FAIL_CONNECTING] = "connecting", + [P_BE_FAIL_READVALIDATE] = "readvalidate", + [P_BE_FAIL_BADVALIDATE] = "badvalidate", + [P_BE_FAIL_WRITING] = "writing", + [P_BE_FAIL_READING] = "reading", + [P_BE_FAIL_PARSING] = "parsing", + NULL +}; + static void proxy_backend_handler(const int fd, const short which, void *arg); +static void proxy_beconn_handler(const int fd, const short which, void *arg); static void proxy_event_handler(evutil_socket_t fd, short which, void *arg); +static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg); static void proxy_event_updater(evutil_socket_t fd, short which, void *arg); static int _flush_pending_write(mcp_backend_t *be); static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err); +static void _backend_failed(mcp_backend_t *be); static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback); static int proxy_backend_drive_machine(mcp_backend_t *be); @@ -163,10 +189,10 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) { // should be safe to hold up until their timeout. _reset_bad_backend(be, P_BE_FAIL_CONNECTING); _backend_failed_ur(be); - P_DEBUG("%s: backend failed to connect\n", __func__); + P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port); return; } - P_DEBUG("%s: backend connected\n", __func__); + P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port); be->connecting = false; be->state = mcp_backend_read; be->bad = false; @@ -402,6 +428,58 @@ static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) { STAT_UL(ctx); } +// event handler for injecting backends for processing +// currently just for initiating connections the first time. +static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) { + proxy_event_thread_t *t = arg; + +#ifdef USE_EVENTFD + uint64_t u; + if (read(fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) { + // Temporary error or wasn't actually ready to read somehow. + return; + } +#else + char buf[1]; + if (read(fd, buf, 1) != 1) { + P_DEBUG("%s: pipe read failed\n", __func__); + return; + } +#endif + + beconn_head_t head; + struct timeval tmp_time = t->tunables.connect; + + STAILQ_INIT(&head); + pthread_mutex_lock(&t->mutex); + STAILQ_CONCAT(&head, &t->beconn_head_in); + pthread_mutex_unlock(&t->mutex); + + // Think we should reuse this code path for manually instructing backends + // to disable/etc but not coding for that generically. We just need to + // check the state of the backend when it reaches here or some flags at + // least. + // FIXME: another ->stacked flag? + // Either that or remove the STAILQ code and just using an array of + // ptr's. + mcp_backend_t *be = NULL; + STAILQ_FOREACH(be, &head, beconn_next) { + be->event_thread = t; + int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags); + if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) { + // if we're already connected for some reason, still push it + // through the connection handler to keep the code unified. It + // will auto-wake because the socket is writeable. + be->connecting = true; + be->can_write = false; + _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler); + } else { + _reset_bad_backend(be, P_BE_FAIL_CONNECTING); + _backend_failed(be); + } + } +} + // event handler for executing backend requests static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) { proxy_event_thread_t *t = arg; @@ -442,13 +520,14 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) { int flags = 0; if (be->connecting) { - P_DEBUG("%s: deferring IO pending connecting\n", __func__); + P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port); } else { flags = _flush_pending_write(be); } if (flags == -1) { _reset_bad_backend(be, P_BE_FAIL_WRITING); + _backend_failed(be); } else { flags = be->can_write ? EV_READ|EV_TIMEOUT : EV_READ|EV_WRITE|EV_TIMEOUT; _set_event(be, t->base, flags, tmp_time, proxy_backend_handler); @@ -760,12 +839,12 @@ static void proxy_backend_retry_handler(const int fd, const short which, void *a mcp_backend_t *be = arg; assert(which & EV_TIMEOUT); struct timeval tmp_time = be->event_thread->tunables.retry; - _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler); + _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler); } // currently just for timeouts, but certain errors should consider a backend // to be "bad" as well. -// must be called before _reset_bad_backend(), so the backend is currently +// must be called after _reset_bad_backend(), so the backend is currently // clear. // TODO (v2): currently only notes for "bad backends" in cases of timeouts or // connect failures. We need a specific connect() handler that executes a @@ -785,20 +864,10 @@ static void _backend_failed(mcp_backend_t *be) { STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1); } else { STAT_INCR(be->event_thread->ctx, backend_failed, 1); - _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler); + _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler); } } -const char *proxy_be_failure_text[] = { - [P_BE_FAIL_TIMEOUT] = "timeout", - [P_BE_FAIL_DISCONNECTED] = "disconnected", - [P_BE_FAIL_CONNECTING] = "connecting", - [P_BE_FAIL_WRITING] = "writing", - [P_BE_FAIL_READING] = "reading", - [P_BE_FAIL_PARSING] = "parsing", - NULL -}; - // TODO (v2): add a second argument for assigning a specific error to all pending // IO's (ie; timeout). // The backend has gotten into a bad state (timed out, protocol desync, or @@ -932,16 +1001,34 @@ static int _flush_pending_write(mcp_backend_t *be) { return flags; } -// The libevent backend callback handler. -// If we end up resetting a backend, it will get put back into a connecting -// state. -static void proxy_backend_handler(const int fd, const short which, void *arg) { +static int _beconn_send_validate(mcp_backend_t *be) { + const char *str = "version\r\n"; + const ssize_t len = strlen(str); + + ssize_t res = write(mcmc_fd(be->client), str, len); + + if (res == -1) { + return -1; + } + + // I'm making an opinionated statement that we should be able to write + // "version\r\n" into a fresh socket without hitting EAGAIN. + if (res < len) { + return -1; + } + + return 1; +} + +// Libevent handler for backends in a connecting state. +static void proxy_beconn_handler(const int fd, const short which, void *arg) { + assert(arg != NULL); mcp_backend_t *be = arg; int flags = EV_TIMEOUT; struct timeval tmp_time = be->event_thread->tunables.read; if (which & EV_TIMEOUT) { - P_DEBUG("%s: timeout received, killing backend queue\n", __func__); + P_DEBUG("%s: backend timed out while connecting\n", __func__); _reset_bad_backend(be, P_BE_FAIL_TIMEOUT); _backend_failed(be); return; @@ -949,9 +1036,7 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) { if (which & EV_WRITE) { be->can_write = true; - // TODO (v2): move connect routine to its own function? - // - hard to do right now because we can't (easily?) edit libevent - // events. + if (be->connecting) { int err = 0; // We were connecting, now ensure we're properly connected. @@ -961,20 +1046,127 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) { // should be safe to hold up until their timeout. _reset_bad_backend(be, P_BE_FAIL_CONNECTING); _backend_failed(be); - P_DEBUG("%s: backend failed to connect\n", __func__); + P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port); return; } - P_DEBUG("%s: backend connected\n", __func__); + P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port); be->connecting = false; be->state = mcp_backend_read; be->bad = false; be->failed_count = 0; + + be->validating = true; + // TODO: make validation optional. + + if (_beconn_send_validate(be) == -1) { + _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE); + _backend_failed(be); + return; + } else { + // buffer should be empty during validation stage. + assert(be->rbufused == 0); + _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler); + } + } + + // TODO: currently never taken, until validation is made optional. + if (!be->validating) { + int res = _flush_pending_write(be); + if (res == -1) { + _reset_bad_backend(be, P_BE_FAIL_WRITING); + _backend_failed(be); + return; + } + flags |= res; } + } + + if (which & EV_READ) { + assert(be->validating); + + int read = recv(mcmc_fd(be->client), be->rbuf + be->rbufused, READ_BUFFER_SIZE - be->rbufused, 0); + if (read > 0) { + mcmc_resp_t r; + be->rbufused += read; + + int status = mcmc_parse_buf(be->client, be->rbuf, be->rbufused, &r); + if (status == MCMC_ERR) { + // Needed more data for a version line, somehow. I feel like + // this should set off some alarms, but it is possible. + if (r.code == MCMC_WANT_READ) { + _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler); + return; + } + + _reset_bad_backend(be, P_BE_FAIL_READVALIDATE); + _backend_failed(be); + return; + } + + if (r.code != MCMC_CODE_VERSION) { + _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE); + _backend_failed(be); + return; + } + + be->validating = false; + be->rbufused = 0; + } else if (read == 0) { + // not connected or error. + _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED); + _backend_failed(be); + return; + } else if (read == -1) { + // sit on epoll again. + if (errno != EAGAIN && errno != EWOULDBLOCK) { + _reset_bad_backend(be, P_BE_FAIL_READING); + _backend_failed(be); + return; + } + _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler); + } + + // Passed validation, don't need to re-read, flush any pending writes. int res = _flush_pending_write(be); if (res == -1) { _reset_bad_backend(be, P_BE_FAIL_WRITING); + _backend_failed(be); return; } + flags |= res; + } + + // Still pending requests to read or write. + if (!be->validating && !STAILQ_EMPTY(&be->io_head)) { + _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler); + } + +} + +// The libevent backend callback handler. +// If we end up resetting a backend, it will get put back into a connecting +// state. +static void proxy_backend_handler(const int fd, const short which, void *arg) { + mcp_backend_t *be = arg; + int flags = EV_TIMEOUT; + struct timeval tmp_time = be->event_thread->tunables.read; + + if (which & EV_TIMEOUT) { + P_DEBUG("%s: timeout received, killing backend queue\n", __func__); + _reset_bad_backend(be, P_BE_FAIL_TIMEOUT); + _backend_failed(be); + return; + } + + if (which & EV_WRITE) { + be->can_write = true; + int res = _flush_pending_write(be); + if (res == -1) { + _reset_bad_backend(be, P_BE_FAIL_WRITING); + _backend_failed(be); + return; + } + flags |= res; } if (which & EV_READ) { @@ -987,16 +1179,19 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) { int res = proxy_backend_drive_machine(be); if (res == -1) { _reset_bad_backend(be, P_BE_FAIL_PARSING); + _backend_failed(be); return; } } else if (read == 0) { // not connected or error. _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED); + _backend_failed(be); return; } else if (read == -1) { // sit on epoll again. if (errno != EAGAIN && errno != EWOULDBLOCK) { _reset_bad_backend(be, P_BE_FAIL_READING); + _backend_failed(be); return; } } @@ -1092,9 +1287,13 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) { #ifdef USE_EVENTFD event_set(&t->notify_event, t->event_fd, EV_READ | EV_PERSIST, proxy_event_handler, t); + event_set(&t->beconn_event, t->be_event_fd, + EV_READ | EV_PERSIST, proxy_event_beconn, t); #else event_set(&t->notify_event, t->notify_receive_fd, EV_READ | EV_PERSIST, proxy_event_handler, t); + event_set(&t->beconn_event, t->be_notify_receive_fd, + EV_READ | EV_PERSIST, proxy_event_beconn, t); #endif evtimer_set(&t->clock_event, proxy_event_updater, t); @@ -1107,7 +1306,11 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) { fprintf(stderr, "Can't monitor libevent notify pipe\n"); exit(1); } - + event_base_set(t->base, &t->beconn_event); + if (event_add(&t->beconn_event, 0) == -1) { + fprintf(stderr, "Can't monitor libevent notify pipe\n"); + exit(1); + } } |