summaryrefslogtreecommitdiff
path: root/proxy_network.c
diff options
context:
space:
mode:
Diffstat (limited to 'proxy_network.c')
-rw-r--r--proxy_network.c257
1 files changed, 230 insertions, 27 deletions
diff --git a/proxy_network.c b/proxy_network.c
index 162c39b..26a0dbf 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -3,11 +3,37 @@
#include "proxy.h"
+enum proxy_be_failures {
+ P_BE_FAIL_TIMEOUT = 0,
+ P_BE_FAIL_DISCONNECTED,
+ P_BE_FAIL_CONNECTING,
+ P_BE_FAIL_READVALIDATE,
+ P_BE_FAIL_BADVALIDATE,
+ P_BE_FAIL_WRITING,
+ P_BE_FAIL_READING,
+ P_BE_FAIL_PARSING,
+};
+
+const char *proxy_be_failure_text[] = {
+ [P_BE_FAIL_TIMEOUT] = "timeout",
+ [P_BE_FAIL_DISCONNECTED] = "disconnected",
+ [P_BE_FAIL_CONNECTING] = "connecting",
+ [P_BE_FAIL_READVALIDATE] = "readvalidate",
+ [P_BE_FAIL_BADVALIDATE] = "badvalidate",
+ [P_BE_FAIL_WRITING] = "writing",
+ [P_BE_FAIL_READING] = "reading",
+ [P_BE_FAIL_PARSING] = "parsing",
+ NULL
+};
+
static void proxy_backend_handler(const int fd, const short which, void *arg);
+static void proxy_beconn_handler(const int fd, const short which, void *arg);
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
+static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg);
static void proxy_event_updater(evutil_socket_t fd, short which, void *arg);
static int _flush_pending_write(mcp_backend_t *be);
static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err);
+static void _backend_failed(mcp_backend_t *be);
static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback);
static int proxy_backend_drive_machine(mcp_backend_t *be);
@@ -163,10 +189,10 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
// should be safe to hold up until their timeout.
_reset_bad_backend(be, P_BE_FAIL_CONNECTING);
_backend_failed_ur(be);
- P_DEBUG("%s: backend failed to connect\n", __func__);
+ P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port);
return;
}
- P_DEBUG("%s: backend connected\n", __func__);
+ P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port);
be->connecting = false;
be->state = mcp_backend_read;
be->bad = false;
@@ -402,6 +428,58 @@ static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) {
STAT_UL(ctx);
}
+// event handler for injecting backends for processing
+// currently just for initiating connections the first time.
+static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
+ proxy_event_thread_t *t = arg;
+
+#ifdef USE_EVENTFD
+ uint64_t u;
+ if (read(fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ // Temporary error or wasn't actually ready to read somehow.
+ return;
+ }
+#else
+ char buf[1];
+ if (read(fd, buf, 1) != 1) {
+ P_DEBUG("%s: pipe read failed\n", __func__);
+ return;
+ }
+#endif
+
+ beconn_head_t head;
+ struct timeval tmp_time = t->tunables.connect;
+
+ STAILQ_INIT(&head);
+ pthread_mutex_lock(&t->mutex);
+ STAILQ_CONCAT(&head, &t->beconn_head_in);
+ pthread_mutex_unlock(&t->mutex);
+
+ // Think we should reuse this code path for manually instructing backends
+ // to disable/etc but not coding for that generically. We just need to
+ // check the state of the backend when it reaches here or some flags at
+ // least.
+ // FIXME: another ->stacked flag?
+ // Either that or remove the STAILQ code and just using an array of
+ // ptr's.
+ mcp_backend_t *be = NULL;
+ STAILQ_FOREACH(be, &head, beconn_next) {
+ be->event_thread = t;
+ int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
+ if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
+ // if we're already connected for some reason, still push it
+ // through the connection handler to keep the code unified. It
+ // will auto-wake because the socket is writeable.
+ be->connecting = true;
+ be->can_write = false;
+ _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
+ } else {
+ _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
+ _backend_failed(be);
+ }
+ }
+}
+
// event handler for executing backend requests
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
proxy_event_thread_t *t = arg;
@@ -442,13 +520,14 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
int flags = 0;
if (be->connecting) {
- P_DEBUG("%s: deferring IO pending connecting\n", __func__);
+ P_DEBUG("%s: deferring IO pending connecting (%s:%s)\n", __func__, be->name, be->port);
} else {
flags = _flush_pending_write(be);
}
if (flags == -1) {
_reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
} else {
flags = be->can_write ? EV_READ|EV_TIMEOUT : EV_READ|EV_WRITE|EV_TIMEOUT;
_set_event(be, t->base, flags, tmp_time, proxy_backend_handler);
@@ -760,12 +839,12 @@ static void proxy_backend_retry_handler(const int fd, const short which, void *a
mcp_backend_t *be = arg;
assert(which & EV_TIMEOUT);
struct timeval tmp_time = be->event_thread->tunables.retry;
- _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler);
+ _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
}
// currently just for timeouts, but certain errors should consider a backend
// to be "bad" as well.
-// must be called before _reset_bad_backend(), so the backend is currently
+// must be called after _reset_bad_backend(), so the backend is currently
// clear.
// TODO (v2): currently only notes for "bad backends" in cases of timeouts or
// connect failures. We need a specific connect() handler that executes a
@@ -785,20 +864,10 @@ static void _backend_failed(mcp_backend_t *be) {
STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1);
} else {
STAT_INCR(be->event_thread->ctx, backend_failed, 1);
- _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler);
+ _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
}
}
-const char *proxy_be_failure_text[] = {
- [P_BE_FAIL_TIMEOUT] = "timeout",
- [P_BE_FAIL_DISCONNECTED] = "disconnected",
- [P_BE_FAIL_CONNECTING] = "connecting",
- [P_BE_FAIL_WRITING] = "writing",
- [P_BE_FAIL_READING] = "reading",
- [P_BE_FAIL_PARSING] = "parsing",
- NULL
-};
-
// TODO (v2): add a second argument for assigning a specific error to all pending
// IO's (ie; timeout).
// The backend has gotten into a bad state (timed out, protocol desync, or
@@ -932,16 +1001,34 @@ static int _flush_pending_write(mcp_backend_t *be) {
return flags;
}
-// The libevent backend callback handler.
-// If we end up resetting a backend, it will get put back into a connecting
-// state.
-static void proxy_backend_handler(const int fd, const short which, void *arg) {
+static int _beconn_send_validate(mcp_backend_t *be) {
+ const char *str = "version\r\n";
+ const ssize_t len = strlen(str);
+
+ ssize_t res = write(mcmc_fd(be->client), str, len);
+
+ if (res == -1) {
+ return -1;
+ }
+
+ // I'm making an opinionated statement that we should be able to write
+ // "version\r\n" into a fresh socket without hitting EAGAIN.
+ if (res < len) {
+ return -1;
+ }
+
+ return 1;
+}
+
+// Libevent handler for backends in a connecting state.
+static void proxy_beconn_handler(const int fd, const short which, void *arg) {
+ assert(arg != NULL);
mcp_backend_t *be = arg;
int flags = EV_TIMEOUT;
struct timeval tmp_time = be->event_thread->tunables.read;
if (which & EV_TIMEOUT) {
- P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
+ P_DEBUG("%s: backend timed out while connecting\n", __func__);
_reset_bad_backend(be, P_BE_FAIL_TIMEOUT);
_backend_failed(be);
return;
@@ -949,9 +1036,7 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
if (which & EV_WRITE) {
be->can_write = true;
- // TODO (v2): move connect routine to its own function?
- // - hard to do right now because we can't (easily?) edit libevent
- // events.
+
if (be->connecting) {
int err = 0;
// We were connecting, now ensure we're properly connected.
@@ -961,20 +1046,127 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
// should be safe to hold up until their timeout.
_reset_bad_backend(be, P_BE_FAIL_CONNECTING);
_backend_failed(be);
- P_DEBUG("%s: backend failed to connect\n", __func__);
+ P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port);
return;
}
- P_DEBUG("%s: backend connected\n", __func__);
+ P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port);
be->connecting = false;
be->state = mcp_backend_read;
be->bad = false;
be->failed_count = 0;
+
+ be->validating = true;
+ // TODO: make validation optional.
+
+ if (_beconn_send_validate(be) == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
+ _backend_failed(be);
+ return;
+ } else {
+ // buffer should be empty during validation stage.
+ assert(be->rbufused == 0);
+ _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler);
+ }
+ }
+
+ // TODO: currently never taken, until validation is made optional.
+ if (!be->validating) {
+ int res = _flush_pending_write(be);
+ if (res == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
+ return;
+ }
+ flags |= res;
}
+ }
+
+ if (which & EV_READ) {
+ assert(be->validating);
+
+ int read = recv(mcmc_fd(be->client), be->rbuf + be->rbufused, READ_BUFFER_SIZE - be->rbufused, 0);
+ if (read > 0) {
+ mcmc_resp_t r;
+ be->rbufused += read;
+
+ int status = mcmc_parse_buf(be->client, be->rbuf, be->rbufused, &r);
+ if (status == MCMC_ERR) {
+ // Needed more data for a version line, somehow. I feel like
+ // this should set off some alarms, but it is possible.
+ if (r.code == MCMC_WANT_READ) {
+ _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler);
+ return;
+ }
+
+ _reset_bad_backend(be, P_BE_FAIL_READVALIDATE);
+ _backend_failed(be);
+ return;
+ }
+
+ if (r.code != MCMC_CODE_VERSION) {
+ _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
+ _backend_failed(be);
+ return;
+ }
+
+ be->validating = false;
+ be->rbufused = 0;
+ } else if (read == 0) {
+ // not connected or error.
+ _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
+ _backend_failed(be);
+ return;
+ } else if (read == -1) {
+ // sit on epoll again.
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ _reset_bad_backend(be, P_BE_FAIL_READING);
+ _backend_failed(be);
+ return;
+ }
+ _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler);
+ }
+
+ // Passed validation, don't need to re-read, flush any pending writes.
int res = _flush_pending_write(be);
if (res == -1) {
_reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
return;
}
+ flags |= res;
+ }
+
+ // Still pending requests to read or write.
+ if (!be->validating && !STAILQ_EMPTY(&be->io_head)) {
+ _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler);
+ }
+
+}
+
+// The libevent backend callback handler.
+// If we end up resetting a backend, it will get put back into a connecting
+// state.
+static void proxy_backend_handler(const int fd, const short which, void *arg) {
+ mcp_backend_t *be = arg;
+ int flags = EV_TIMEOUT;
+ struct timeval tmp_time = be->event_thread->tunables.read;
+
+ if (which & EV_TIMEOUT) {
+ P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
+ _reset_bad_backend(be, P_BE_FAIL_TIMEOUT);
+ _backend_failed(be);
+ return;
+ }
+
+ if (which & EV_WRITE) {
+ be->can_write = true;
+ int res = _flush_pending_write(be);
+ if (res == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed(be);
+ return;
+ }
+ flags |= res;
}
if (which & EV_READ) {
@@ -987,16 +1179,19 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
int res = proxy_backend_drive_machine(be);
if (res == -1) {
_reset_bad_backend(be, P_BE_FAIL_PARSING);
+ _backend_failed(be);
return;
}
} else if (read == 0) {
// not connected or error.
_reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
+ _backend_failed(be);
return;
} else if (read == -1) {
// sit on epoll again.
if (errno != EAGAIN && errno != EWOULDBLOCK) {
_reset_bad_backend(be, P_BE_FAIL_READING);
+ _backend_failed(be);
return;
}
}
@@ -1092,9 +1287,13 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) {
#ifdef USE_EVENTFD
event_set(&t->notify_event, t->event_fd,
EV_READ | EV_PERSIST, proxy_event_handler, t);
+ event_set(&t->beconn_event, t->be_event_fd,
+ EV_READ | EV_PERSIST, proxy_event_beconn, t);
#else
event_set(&t->notify_event, t->notify_receive_fd,
EV_READ | EV_PERSIST, proxy_event_handler, t);
+ event_set(&t->beconn_event, t->be_notify_receive_fd,
+ EV_READ | EV_PERSIST, proxy_event_beconn, t);
#endif
evtimer_set(&t->clock_event, proxy_event_updater, t);
@@ -1107,7 +1306,11 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) {
fprintf(stderr, "Can't monitor libevent notify pipe\n");
exit(1);
}
-
+ event_base_set(t->base, &t->beconn_event);
+ if (event_add(&t->beconn_event, 0) == -1) {
+ fprintf(stderr, "Can't monitor libevent notify pipe\n");
+ exit(1);
+ }
}