diff options
Diffstat (limited to 'proxy_network.c')
-rw-r--r-- | proxy_network.c | 257 |
1 files changed, 230 insertions, 27 deletions
diff --git a/proxy_network.c b/proxy_network.c index 162c39b..26a0dbf 100644 --- a/proxy_network.c +++ b/proxy_network.c @@ -3,11 +3,37 @@ #include "proxy.h" +enum proxy_be_failures { + P_BE_FAIL_TIMEOUT = 0, + P_BE_FAIL_DISCONNECTED, + P_BE_FAIL_CONNECTING, + P_BE_FAIL_READVALIDATE, + P_BE_FAIL_BADVALIDATE, + P_BE_FAIL_WRITING, + P_BE_FAIL_READING, + P_BE_FAIL_PARSING, +}; + +const char *proxy_be_failure_text[] = { + [P_BE_FAIL_TIMEOUT] = "timeout", + [P_BE_FAIL_DISCONNECTED] = "disconnected", + [P_BE_FAIL_CONNECTING] = "connecting", + [P_BE_FAIL_READVALIDATE] = "readvalidate", + [P_BE_FAIL_BADVALIDATE] = "badvalidate", + [P_BE_FAIL_WRITING] = "writing", + [P_BE_FAIL_READING] = "reading", + [P_BE_FAIL_PARSING] = "parsing", + NULL +}; + static void proxy_backend_handler(const int fd, const short which, void *arg); +static void proxy_beconn_handler(const int fd, const short which, void *arg); static void proxy_event_handler(evutil_socket_t fd, short which, void *arg); +static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg); static void proxy_event_updater(evutil_socket_t fd, short which, void *arg); static int _flush_pending_write(mcp_backend_t *be); static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err); +static void _backend_failed(mcp_backend_t *be); static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback); static int proxy_backend_drive_machine(mcp_backend_t *be); @@ -163,10 +189,10 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) { // should be safe to hold up until their timeout. _reset_bad_backend(be, P_BE_FAIL_CONNECTING); _backend_failed_ur(be); - P_DEBUG("%s: backend failed to connect\n", __func__); + P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port); return; } - P_DEBUG("%s: backend connected\n", __func__); + P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port); be->connecting = false; be->state = mcp_backend_read; be->bad = false; @@ -402,6 +428,58 @@ static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) { STAT_UL(ctx); } +// event handler for injecting backends for processing +// currently just for initiating connections the first time. +static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) { + proxy_event_thread_t *t = arg; + +#ifdef USE_EVENTFD + uint64_t u; + if (read(fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) { + // Temporary error or wasn't actually ready to read somehow. + return; + } +#else + char buf[1]; + if (read(fd, buf, 1) != 1) { + P_DEBUG("%s: pipe read failed\n", __func__); + return; + } +#endif + + beconn_head_t head; + struct timeval tmp_time = t->tunables.connect; + + STAILQ_INIT(&head); + pthread_mutex_lock(&t->mutex); + STAILQ_CONCAT(&head, &t->beconn_head_in); + pthread_mutex_unlock(&t->mutex); + + // Think we should reuse this code path for manually instructing backends + // to disable/etc but not coding for that generically. We just need to + // check the state of the backend when it reaches here or some flags at + // least. + // FIXME: another ->stacked flag? + // Either that or remove the STAILQ code and just using an array of + // ptr's. + mcp_backend_t *be = NULL; + STAILQ_FOREACH(be, &head, beconn_next) { + be->event_thread = t; + int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags); + if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) { + // if we're already connected for some reason, still push it + // through the connection handler to keep the code unified. It + // will auto-wake because the socket is writeable. + be->connecting = true; + be->can_write = false; + _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler); + } else { + _reset_bad_backend(be, P_BE_FAIL_CONNECTING); + _backend_failed(be); + } + } +} + // event handler for executing backend requests static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) { proxy_event_thread_t *t = arg; @@ -442,13 +520,14 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) { int flags = 0; if (be->connecting) { - P_DEBUG("%s: deferring IO pending connecting\n", __func__); + P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port); } else { flags = _flush_pending_write(be); } if (flags == -1) { _reset_bad_backend(be, P_BE_FAIL_WRITING); + _backend_failed(be); } else { flags = be->can_write ? EV_READ|EV_TIMEOUT : EV_READ|EV_WRITE|EV_TIMEOUT; _set_event(be, t->base, flags, tmp_time, proxy_backend_handler); @@ -760,12 +839,12 @@ static void proxy_backend_retry_handler(const int fd, const short which, void *a mcp_backend_t *be = arg; assert(which & EV_TIMEOUT); struct timeval tmp_time = be->event_thread->tunables.retry; - _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler); + _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler); } // currently just for timeouts, but certain errors should consider a backend // to be "bad" as well. -// must be called before _reset_bad_backend(), so the backend is currently +// must be called after _reset_bad_backend(), so the backend is currently // clear. // TODO (v2): currently only notes for "bad backends" in cases of timeouts or // connect failures. We need a specific connect() handler that executes a @@ -785,20 +864,10 @@ static void _backend_failed(mcp_backend_t *be) { STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1); } else { STAT_INCR(be->event_thread->ctx, backend_failed, 1); - _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler); + _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler); } } -const char *proxy_be_failure_text[] = { - [P_BE_FAIL_TIMEOUT] = "timeout", - [P_BE_FAIL_DISCONNECTED] = "disconnected", - [P_BE_FAIL_CONNECTING] = "connecting", - [P_BE_FAIL_WRITING] = "writing", - [P_BE_FAIL_READING] = "reading", - [P_BE_FAIL_PARSING] = "parsing", - NULL -}; - // TODO (v2): add a second argument for assigning a specific error to all pending // IO's (ie; timeout). // The backend has gotten into a bad state (timed out, protocol desync, or @@ -932,16 +1001,34 @@ static int _flush_pending_write(mcp_backend_t *be) { return flags; } -// The libevent backend callback handler. -// If we end up resetting a backend, it will get put back into a connecting -// state. -static void proxy_backend_handler(const int fd, const short which, void *arg) { +static int _beconn_send_validate(mcp_backend_t *be) { + const char *str = "version\r\n"; + const ssize_t len = strlen(str); + + ssize_t res = write(mcmc_fd(be->client), str, len); + + if (res == -1) { + return -1; + } + + // I'm making an opinionated statement that we should be able to write + // "version\r\n" into a fresh socket without hitting EAGAIN. + if (res < len) { + return -1; + } + + return 1; +} + +// Libevent handler for backends in a connecting state. +static void proxy_beconn_handler(const int fd, const short which, void *arg) { + assert(arg != NULL); mcp_backend_t *be = arg; int flags = EV_TIMEOUT; struct timeval tmp_time = be->event_thread->tunables.read; if (which & EV_TIMEOUT) { - P_DEBUG("%s: timeout received, killing backend queue\n", __func__); + P_DEBUG("%s: backend timed out while connecting\n", __func__); _reset_bad_backend(be, P_BE_FAIL_TIMEOUT); _backend_failed(be); return; @@ -949,9 +1036,7 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) { if (which & EV_WRITE) { be->can_write = true; - // TODO (v2): move connect routine to its own function? - // - hard to do right now because we can't (easily?) edit libevent - // events. + if (be->connecting) { int err = 0; // We were connecting, now ensure we're properly connected. @@ -961,20 +1046,127 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) { // should be safe to hold up until their timeout. _reset_bad_backend(be, P_BE_FAIL_CONNECTING); _backend_failed(be); - P_DEBUG("%s: backend failed to connect\n", __func__); + P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port); return; } - P_DEBUG("%s: backend connected\n", __func__); + P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port); be->connecting = false; be->state = mcp_backend_read; be->bad = false; be->failed_count = 0; + + be->validating = true; + // TODO: make validation optional. + + if (_beconn_send_validate(be) == -1) { + _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE); + _backend_failed(be); + return; + } else { + // buffer should be empty during validation stage. + assert(be->rbufused == 0); + _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler); + } + } + + // TODO: currently never taken, until validation is made optional. + if (!be->validating) { + int res = _flush_pending_write(be); + if (res == -1) { + _reset_bad_backend(be, P_BE_FAIL_WRITING); + _backend_failed(be); + return; + } + flags |= res; } + } + + if (which & EV_READ) { + assert(be->validating); + + int read = recv(mcmc_fd(be->client), be->rbuf + be->rbufused, READ_BUFFER_SIZE - be->rbufused, 0); + if (read > 0) { + mcmc_resp_t r; + be->rbufused += read; + + int status = mcmc_parse_buf(be->client, be->rbuf, be->rbufused, &r); + if (status == MCMC_ERR) { + // Needed more data for a version line, somehow. I feel like + // this should set off some alarms, but it is possible. + if (r.code == MCMC_WANT_READ) { + _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler); + return; + } + + _reset_bad_backend(be, P_BE_FAIL_READVALIDATE); + _backend_failed(be); + return; + } + + if (r.code != MCMC_CODE_VERSION) { + _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE); + _backend_failed(be); + return; + } + + be->validating = false; + be->rbufused = 0; + } else if (read == 0) { + // not connected or error. + _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED); + _backend_failed(be); + return; + } else if (read == -1) { + // sit on epoll again. + if (errno != EAGAIN && errno != EWOULDBLOCK) { + _reset_bad_backend(be, P_BE_FAIL_READING); + _backend_failed(be); + return; + } + _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler); + } + + // Passed validation, don't need to re-read, flush any pending writes. int res = _flush_pending_write(be); if (res == -1) { _reset_bad_backend(be, P_BE_FAIL_WRITING); + _backend_failed(be); return; } + flags |= res; + } + + // Still pending requests to read or write. + if (!be->validating && !STAILQ_EMPTY(&be->io_head)) { + _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler); + } + +} + +// The libevent backend callback handler. +// If we end up resetting a backend, it will get put back into a connecting +// state. +static void proxy_backend_handler(const int fd, const short which, void *arg) { + mcp_backend_t *be = arg; + int flags = EV_TIMEOUT; + struct timeval tmp_time = be->event_thread->tunables.read; + + if (which & EV_TIMEOUT) { + P_DEBUG("%s: timeout received, killing backend queue\n", __func__); + _reset_bad_backend(be, P_BE_FAIL_TIMEOUT); + _backend_failed(be); + return; + } + + if (which & EV_WRITE) { + be->can_write = true; + int res = _flush_pending_write(be); + if (res == -1) { + _reset_bad_backend(be, P_BE_FAIL_WRITING); + _backend_failed(be); + return; + } + flags |= res; } if (which & EV_READ) { @@ -987,16 +1179,19 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) { int res = proxy_backend_drive_machine(be); if (res == -1) { _reset_bad_backend(be, P_BE_FAIL_PARSING); + _backend_failed(be); return; } } else if (read == 0) { // not connected or error. _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED); + _backend_failed(be); return; } else if (read == -1) { // sit on epoll again. if (errno != EAGAIN && errno != EWOULDBLOCK) { _reset_bad_backend(be, P_BE_FAIL_READING); + _backend_failed(be); return; } } @@ -1092,9 +1287,13 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) { #ifdef USE_EVENTFD event_set(&t->notify_event, t->event_fd, EV_READ | EV_PERSIST, proxy_event_handler, t); + event_set(&t->beconn_event, t->be_event_fd, + EV_READ | EV_PERSIST, proxy_event_beconn, t); #else event_set(&t->notify_event, t->notify_receive_fd, EV_READ | EV_PERSIST, proxy_event_handler, t); + event_set(&t->beconn_event, t->be_notify_receive_fd, + EV_READ | EV_PERSIST, proxy_event_beconn, t); #endif evtimer_set(&t->clock_event, proxy_event_updater, t); @@ -1107,7 +1306,11 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) { fprintf(stderr, "Can't monitor libevent notify pipe\n"); exit(1); } - + event_base_set(t->base, &t->beconn_event); + if (event_add(&t->beconn_event, 0) == -1) { + fprintf(stderr, "Can't monitor libevent notify pipe\n"); + exit(1); + } } |