summaryrefslogtreecommitdiff
path: root/proxy_network.c
diff options
context:
space:
mode:
Diffstat (limited to 'proxy_network.c')
-rw-r--r--proxy_network.c1120
1 files changed, 1120 insertions, 0 deletions
diff --git a/proxy_network.c b/proxy_network.c
new file mode 100644
index 0000000..73735a0
--- /dev/null
+++ b/proxy_network.c
@@ -0,0 +1,1120 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// Functions related to the backend handler thread.
+
+#include "proxy.h"
+
+static void proxy_backend_handler(const int fd, const short which, void *arg);
+static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
+static void proxy_event_updater(evutil_socket_t fd, short which, void *arg);
+static int _flush_pending_write(mcp_backend_t *be);
+static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err);
+static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback);
+static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf, size_t *toread);
+
+static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
+ io_head_t head;
+
+ STAILQ_INIT(&head);
+ STAILQ_INIT(&t->be_head);
+
+ // Pull the entire stack of inbound into local queue.
+ pthread_mutex_lock(&t->mutex);
+ STAILQ_CONCAT(&head, &t->io_head_in);
+ pthread_mutex_unlock(&t->mutex);
+
+ int io_count = 0;
+ int be_count = 0;
+ while (!STAILQ_EMPTY(&head)) {
+ io_pending_proxy_t *io = STAILQ_FIRST(&head);
+ io->flushed = false;
+ mcp_backend_t *be = io->backend;
+ // So the backend can retrieve its event base.
+ be->event_thread = t;
+
+ // _no_ mutex on backends. they are owned by the event thread.
+ STAILQ_REMOVE_HEAD(&head, io_next);
+ if (be->bad) {
+ P_DEBUG("%s: fast failing request to bad backend\n", __func__);
+ io->client_resp->status = MCMC_ERR;
+ return_io_pending((io_pending_t *)io);
+ continue;
+ }
+ STAILQ_INSERT_TAIL(&be->io_head, io, io_next);
+ be->depth++;
+ io_count++;
+ if (!be->stacked) {
+ be->stacked = true;
+ STAILQ_INSERT_TAIL(&t->be_head, be, be_next);
+ be_count++;
+ }
+ }
+ //P_DEBUG("%s: io/be counts for syscalls [%d/%d]\n", __func__, io_count, be_count);
+ return io_count;
+}
+
+#ifdef HAVE_LIBURING
+static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts);
+static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts);
+static void _proxy_evthr_evset_be_retry(mcp_backend_t *be);
+
+static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe) {
+ proxy_event_thread_t *t = udata;
+ proxy_ctx_t *ctx = t->ctx;
+
+ _proxy_evthr_evset_clock(t);
+
+ // we reuse the "global stats" lock since it's hardly ever used.
+ STAT_L(ctx);
+ memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables));
+ STAT_UL(ctx);
+}
+
+// No-op at the moment. when the linked timeout fires uring returns the
+// linked request (read/write/poll/etc) with an interrupted/timeout/cancelled
+// error. So we don't need to explicitly handle timeouts.
+// I'm leaving the structure in to simplify the callback routine.
+// Since timeouts rarely get called the extra code here shouldn't matter.
+static void proxy_backend_timeout_handler_ur(void *udata, struct io_uring_cqe *cqe) {
+ return;
+}
+
+static void proxy_backend_retry_handler_ur(void *udata, struct io_uring_cqe *cqe) {
+ mcp_backend_t *be = udata;
+ _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.connect_ur);
+}
+
+static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) {
+ struct io_uring_sqe *sqe;
+ if (be->ur_te_ev.set)
+ return;
+
+ be->ur_te_ev.cb = proxy_backend_retry_handler_ur;
+ be->ur_te_ev.udata = be;
+
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+ // TODO (v2): NULL?
+
+ io_uring_prep_timeout(sqe, &be->event_thread->tunables.retry_ur, 0, 0);
+ io_uring_sqe_set_data(sqe, &be->ur_te_ev);
+ be->ur_te_ev.set = true;
+}
+
+static void _backend_failed_ur(mcp_backend_t *be) {
+ if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) {
+ P_DEBUG("%s: marking backend as bad\n", __func__);
+ be->bad = true;
+ _proxy_evthr_evset_be_retry(be);
+ } else {
+ _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.retry_ur);
+ }
+}
+
+// read handler.
+static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) {
+ mcp_backend_t *be = udata;
+ int bread = cqe->res;
+ char *rbuf = NULL;
+ size_t toread = 0;
+ // Error or disconnection.
+ if (bread <= 0) {
+ _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
+ // NOTE: Not calling backed_failed here since if the backend is busted
+ // it should be caught by the connect routine.
+ // This is probably not _always_ true in practice. Leaving this note
+ // so I can re-evaluate later.
+ return;
+ }
+
+ int res = proxy_backend_drive_machine(be, bread, &rbuf, &toread);
+ P_DEBUG("%s: bread: %d res: %d toread: %lu\n", __func__, bread, res, toread);
+
+ if (res > 0) {
+ _proxy_evthr_evset_be_read(be, rbuf, toread, &be->event_thread->tunables.read_ur);
+ } else if (res == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
+ return;
+ }
+
+ // TODO (v2): when exactly do we need to reset the backend handler?
+ if (!STAILQ_EMPTY(&be->io_head)) {
+ _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur);
+ }
+}
+
+static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
+ mcp_backend_t *be = udata;
+ int flags = 0;
+
+ be->can_write = true;
+ if (be->connecting) {
+ int err = 0;
+ // We were connecting, now ensure we're properly connected.
+ if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) {
+ // kick the bad backend, clear the queue, retry later.
+ // TODO (v2): if a connect fails, anything currently in the queue
+ // should be safe to hold up until their timeout.
+ _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
+ _backend_failed_ur(be);
+ P_DEBUG("%s: backend failed to connect\n", __func__);
+ return;
+ }
+ P_DEBUG("%s: backend connected\n", __func__);
+ be->connecting = false;
+ be->state = mcp_backend_read;
+ be->bad = false;
+ be->failed_count = 0;
+ }
+ int res = _flush_pending_write(be);
+ if (res == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ return;
+ }
+
+ if (flags & EV_WRITE) {
+ _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.connect_ur);
+ }
+
+ _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur);
+}
+
+static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) {
+ proxy_event_thread_t *t = udata;
+
+ // liburing always uses eventfd for the notifier.
+ // *cqe has our result.
+ assert(cqe->res != -EINVAL);
+ if (cqe->res != sizeof(eventfd_t)) {
+ P_DEBUG("%s: cqe->res: %d\n", __func__, cqe->res);
+ // FIXME (v2): figure out if this is impossible, and how to handle if not.
+ assert(1 == 0);
+ }
+
+ // need to re-arm the listener every time.
+ _proxy_evthr_evset_notifier(t);
+
+ // TODO (v2): sqe queues for writing to backends
+ // - _ur handler for backend write completion is to set a read event and
+ // re-submit. ugh.
+ // Should be possible to have standing reads, but flow is harder and lets
+ // optimize that later. (ie; allow matching reads to a request but don't
+ // actually dequeue anything until both read and write are confirmed)
+ if (_proxy_event_handler_dequeue(t) == 0) {
+ //P_DEBUG("%s: no IO's to complete\n", __func__);
+ return;
+ }
+
+ // Re-walk each backend and check set event as required.
+ mcp_backend_t *be = NULL;
+
+ // TODO (v2): for each backend, queue writev's into sqe's
+ // move the backend sqe bits into a write complete handler
+ STAILQ_FOREACH(be, &t->be_head, be_next) {
+ be->stacked = false;
+ int flags = 0;
+
+ if (be->connecting) {
+ P_DEBUG("%s: deferring IO pending connecting\n", __func__);
+ flags |= EV_WRITE;
+ } else {
+ flags = _flush_pending_write(be);
+ }
+
+ if (flags == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ } else {
+ // FIXME (v2): needs a re-write to handle sqe starvation.
+ // FIXME (v2): can't actually set the read here? need to confirm _some_
+ // write first?
+ if (flags & EV_WRITE) {
+ _proxy_evthr_evset_be_wrpoll(be, &t->tunables.connect_ur);
+ }
+ if (flags & EV_READ) {
+ _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &t->tunables.read_ur);
+ }
+ }
+ }
+}
+
+static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts) {
+ struct io_uring_sqe *sqe;
+ if (be->ur_wr_ev.set)
+ return;
+
+ be->ur_wr_ev.cb = proxy_backend_wrhandler_ur;
+ be->ur_wr_ev.udata = be;
+
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+ // FIXME (v2): NULL?
+
+ io_uring_prep_poll_add(sqe, mcmc_fd(be->client), POLLOUT);
+ io_uring_sqe_set_data(sqe, &be->ur_wr_ev);
+ be->ur_wr_ev.set = true;
+
+ sqe->flags |= IOSQE_IO_LINK;
+
+ // add a timeout.
+ be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
+ be->ur_te_ev.udata = be;
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+
+ io_uring_prep_link_timeout(sqe, ts, 0);
+ io_uring_sqe_set_data(sqe, &be->ur_te_ev);
+}
+
+static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts) {
+ P_DEBUG("%s: setting: %lu\n", __func__, len);
+ struct io_uring_sqe *sqe;
+ if (be->ur_rd_ev.set) {
+ P_DEBUG("%s: already set\n", __func__);
+ return;
+ }
+
+ be->ur_rd_ev.cb = proxy_backend_handler_ur;
+ be->ur_rd_ev.udata = be;
+
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+ // FIXME (v2): NULL?
+ assert(be->rbuf != NULL);
+ io_uring_prep_recv(sqe, mcmc_fd(be->client), buf, len, 0);
+ io_uring_sqe_set_data(sqe, &be->ur_rd_ev);
+ be->ur_rd_ev.set = true;
+
+ sqe->flags |= IOSQE_IO_LINK;
+
+ // add a timeout.
+ // TODO (v2): we can pre-set the event data and avoid always re-doing it here.
+ be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
+ be->ur_te_ev.udata = be;
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+
+ io_uring_prep_link_timeout(sqe, ts, 0);
+ io_uring_sqe_set_data(sqe, &be->ur_te_ev);
+}
+
+static void _proxy_evthr_evset_clock(proxy_event_thread_t *t) {
+ struct io_uring_sqe *sqe;
+
+ sqe = io_uring_get_sqe(&t->ring);
+ // FIXME (v2): NULL?
+
+ io_uring_prep_timeout(sqe, &updater_ts, 0, 0);
+ io_uring_sqe_set_data(sqe, &t->ur_clock_event);
+ t->ur_clock_event.set = true;
+}
+
+static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t) {
+ struct io_uring_sqe *sqe;
+ P_DEBUG("%s: setting: %d\n", __func__, t->ur_notify_event.set);
+ if (t->ur_notify_event.set)
+ return;
+
+ t->ur_notify_event.cb = proxy_event_handler_ur;
+ t->ur_notify_event.udata = t;
+
+ sqe = io_uring_get_sqe(&t->ring);
+ // FIXME (v2): NULL?
+ io_uring_prep_read(sqe, t->event_fd, &t->event_counter, sizeof(eventfd_t), 0);
+ io_uring_sqe_set_data(sqe, &t->ur_notify_event);
+}
+
+// TODO (v2): CQE's can generate many SQE's, so we might need to occasionally check
+// for space free in the sqe queue and submit in the middle of the cqe
+// foreach.
+// There might be better places to do this, but I think it's cleaner if
+// submission and cqe can stay in this function.
+// TODO (v2): The problem is io_submit() can deadlock if too many cqe's are
+// waiting.
+// Need to understand if this means "CQE's ready to be picked up" or "CQE's in
+// flight", because the former is much easier to work around (ie; only run the
+// backend handler after dequeuing everything else)
+// TODO (v2): IOURING_FEAT_NODROP: uring_submit() should return -EBUSY if out of CQ
+// events slots. Therefore might starve SQE's if we were low beforehand.
+// - switch from for_each_cqe to doing one at a time (for now?)
+// - track # of sqe's allocated in the cqe loop.
+// - stop and submit if we've >= half the queue.
+// - ??? when can a CQE generate > 1 SQE?
+// - wrhandler_ur can set both wrpoll and read
+// - if CQE's can gen > 1 SQE at a time, we'll eventually starve.
+// - proper flow: CQE's can enqueue backends to be processed.
+// - after CQE's are processed, backends are processed (ouch?)
+// - if SQE's starve here, bail but keep the BE queue.
+// - then submit SQE's
+static void *proxy_event_thread_ur(void *arg) {
+ proxy_event_thread_t *t = arg;
+ struct io_uring_cqe *cqe;
+
+ P_DEBUG("%s: starting\n", __func__);
+
+ logger_create(); // TODO (v2): add logger to struct
+ while (1) {
+ P_DEBUG("%s: submit and wait\n", __func__);
+ io_uring_submit_and_wait(&t->ring, 1);
+ //P_DEBUG("%s: sqe submitted: %d\n", __func__, ret);
+
+ uint32_t head = 0;
+ uint32_t count = 0;
+
+ io_uring_for_each_cqe(&t->ring, head, cqe) {
+ P_DEBUG("%s: got a CQE [count:%d]\n", __func__, count);
+
+ proxy_event_t *pe = io_uring_cqe_get_data(cqe);
+ pe->set = false;
+ pe->cb(pe->udata, cqe);
+
+ count++;
+ }
+
+ P_DEBUG("%s: advancing [count:%d]\n", __func__, count);
+ io_uring_cq_advance(&t->ring, count);
+ }
+
+ return NULL;
+}
+#endif // HAVE_LIBURING
+
+// We need to get timeout/retry/etc updates to the event thread(s)
+// occasionally. I'd like to have a better inteface around this where updates
+// are shipped directly; but this is good enough to start with.
+static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) {
+ proxy_event_thread_t *t = arg;
+ proxy_ctx_t *ctx = t->ctx;
+
+ // TODO (v2): double check how much of this boilerplate is still necessary?
+ // reschedule the clock event.
+ evtimer_del(&t->clock_event);
+
+ evtimer_set(&t->clock_event, proxy_event_updater, t);
+ event_base_set(t->base, &t->clock_event);
+ struct timeval rate = {.tv_sec = 3, .tv_usec = 0};
+ evtimer_add(&t->clock_event, &rate);
+
+ // we reuse the "global stats" lock since it's hardly ever used.
+ STAT_L(ctx);
+ memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables));
+ STAT_UL(ctx);
+}
+
+// event handler for executing backend requests
+static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
+ proxy_event_thread_t *t = arg;
+
+#ifdef USE_EVENTFD
+ uint64_t u;
+ if (read(fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ // Temporary error or wasn't actually ready to read somehow.
+ return;
+ }
+#else
+ char buf[1];
+ // TODO (v2): This is a lot more fatal than it should be. can it fail? can
+ // it blow up the server?
+ // TODO (v2): a cross-platform method of speeding this up would be nice. With
+ // event fds we can queue N events and wakeup once here.
+ // If we're pulling one byte out of the pipe at a time here it'll just
+ // wake us up too often.
+ // If the pipe is O_NONBLOCK then maybe just a larger read would work?
+ if (read(fd, buf, 1) != 1) {
+ P_DEBUG("%s: pipe read failed\n", __func__);
+ return;
+ }
+#endif
+
+ if (_proxy_event_handler_dequeue(t) == 0) {
+ //P_DEBUG("%s: no IO's to complete\n", __func__);
+ return;
+ }
+
+ // Re-walk each backend and check set event as required.
+ mcp_backend_t *be = NULL;
+ struct timeval tmp_time = t->tunables.read;
+
+ // FIXME (v2): _set_event() is buggy, see notes on function.
+ STAILQ_FOREACH(be, &t->be_head, be_next) {
+ be->stacked = false;
+ int flags = 0;
+
+ if (be->connecting) {
+ P_DEBUG("%s: deferring IO pending connecting\n", __func__);
+ } else {
+ flags = _flush_pending_write(be);
+ }
+
+ if (flags == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ } else {
+ flags = be->can_write ? EV_READ|EV_TIMEOUT : EV_READ|EV_WRITE|EV_TIMEOUT;
+ _set_event(be, t->base, flags, tmp_time, proxy_backend_handler);
+ }
+ }
+
+}
+
+void *proxy_event_thread(void *arg) {
+ proxy_event_thread_t *t = arg;
+
+ logger_create(); // TODO (v2): add logger ptr to structure
+ event_base_loop(t->base, 0);
+ event_base_free(t->base);
+
+ // TODO (v2): join bt threads, free array.
+
+ return NULL;
+}
+
+// FIXME (v2): if we use the newer API the various pending checks can be adjusted.
+static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback) {
+ // FIXME (v2): chicken and egg.
+ // can't check if pending if the structure is was calloc'ed (sigh)
+ // don't want to double test here. should be able to event_assign but
+ // not add anything during initialization, but need the owner thread's
+ // event base.
+ int pending = 0;
+ if (event_initialized(&be->event)) {
+ pending = event_pending(&be->event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL);
+ }
+ if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) {
+ event_del(&be->event); // replace existing event.
+ }
+
+ // if we can't write, we could be connecting.
+ // TODO (v2): always check for READ in case some commands were sent
+ // successfully? The flags could be tracked on *be and reset in the
+ // handler, perhaps?
+ event_assign(&be->event, base, mcmc_fd(be->client),
+ flags, callback, be);
+ event_add(&be->event, &t);
+}
+
+// NOTES:
+// - mcp_backend_read: grab req_stack_head, do things
+// read -> next, want_read -> next | read_end, etc.
+// issue: want read back to read_end as necessary. special state?
+// - it's fine: p->client_resp->type.
+// - mcp_backend_next: advance, consume, etc.
+// TODO (v2): second argument with enum for a specific error.
+// - probably just for logging. for app if any of these errors shouldn't
+// result in killing the request stack!
+static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf, size_t *toread) {
+ bool stop = false;
+ io_pending_proxy_t *p = NULL;
+ mcmc_resp_t tmp_resp; // helper for testing for GET's END marker.
+ int flags = 0;
+
+ p = STAILQ_FIRST(&be->io_head);
+ if (p == NULL) {
+ // got a read event, but nothing was queued.
+ // probably means a disconnect event.
+ // TODO (v2): could probably confirm this by attempting to read the
+ // socket, getsockopt, or something else simply for logging or
+ // statistical purposes.
+ // In this case we know it's going to be a close so error.
+ flags = -1;
+ P_DEBUG("%s: read event but nothing in IO queue\n", __func__);
+ return flags;
+ }
+
+ while (!stop) {
+ mcp_resp_t *r;
+ int res = 1;
+ int remain = 0;
+ char *newbuf = NULL;
+
+ switch(be->state) {
+ case mcp_backend_read:
+ assert(p != NULL);
+ P_DEBUG("%s: [read] bread: %d\n", __func__, bread);
+
+ if (bread == 0) {
+ // haven't actually done a read yet; figure out where/what.
+ *rbuf = mcmc_read_prep(be->client, be->rbuf, READ_BUFFER_SIZE, toread);
+ return EV_READ;
+ } else {
+ be->state = mcp_backend_parse;
+ }
+ break;
+ case mcp_backend_parse:
+ r = p->client_resp;
+ r->status = mcmc_parse_buf(be->client, be->rbuf, bread, &r->resp);
+ // FIXME (v2): Don't like how this mcmc API ended up.
+ bread = 0; // only add the bread once per loop.
+ if (r->status != MCMC_OK) {
+ P_DEBUG("%s: mcmc_read failed [%d]\n", __func__, r->status);
+ if (r->status == MCMC_WANT_READ) {
+ flags |= EV_READ;
+ be->state = mcp_backend_read;
+ stop = true;
+ break;
+ } else {
+ flags = -1;
+ stop = true;
+ break;
+ }
+ }
+
+ // we actually don't care about anything but the value length
+ // TODO (v2): if vlen != vlen_read, pull an item and copy the data.
+ int extra_space = 0;
+ switch (r->resp.type) {
+ case MCMC_RESP_GET:
+ // We're in GET mode. we only support one key per
+ // GET in the proxy backends, so we need to later check
+ // for an END.
+ extra_space = ENDLEN;
+ break;
+ case MCMC_RESP_END:
+ // this is a MISS from a GET request
+ // or final handler from a STAT request.
+ assert(r->resp.vlen == 0);
+ break;
+ case MCMC_RESP_META:
+ // we can handle meta responses easily since they're self
+ // contained.
+ break;
+ case MCMC_RESP_GENERIC:
+ case MCMC_RESP_NUMERIC:
+ break;
+ // TODO (v2): No-op response?
+ default:
+ P_DEBUG("%s: Unhandled response from backend: %d\n", __func__, r->resp.type);
+ // unhandled :(
+ flags = -1;
+ stop = true;
+ break;
+ }
+
+ if (res) {
+ if (p->ascii_multiget && r->resp.type == MCMC_RESP_END) {
+ // Ascii multiget hack mode; consume END's
+ be->state = mcp_backend_next;
+ break;
+ }
+
+ // r->resp.reslen + r->resp.vlen is the total length of the response.
+ // TODO (v2): need to associate a buffer with this response...
+ // for now lets abuse write_and_free on mc_resp and simply malloc the
+ // space we need, stuffing it into the resp object.
+
+ r->blen = r->resp.reslen + r->resp.vlen;
+ r->buf = malloc(r->blen + extra_space);
+ if (r->buf == NULL) {
+ flags = -1; // TODO (v2): specific error.
+ stop = true;
+ break;
+ }
+
+ P_DEBUG("%s: r->status: %d, r->bread: %d, r->vlen: %lu\n", __func__, r->status, r->bread, r->resp.vlen);
+ if (r->resp.vlen != r->resp.vlen_read) {
+ P_DEBUG("%s: got a short read, moving to want_read\n", __func__);
+ // copy the partial and advance mcmc's buffer digestion.
+ // FIXME (v2): should call this for both cases?
+ // special function for advancing mcmc's buffer for
+ // reading a value? perhaps a flag to skip the data copy
+ // when it's unnecessary?
+ memcpy(r->buf, be->rbuf, r->resp.reslen);
+ r->status = mcmc_read_value_buf(be->client, r->buf+r->resp.reslen, r->resp.vlen, &r->bread);
+ be->state = mcp_backend_want_read;
+ break;
+ } else {
+ // mcmc's already counted the value as read if it fit in
+ // the original buffer...
+ memcpy(r->buf, be->rbuf, r->resp.reslen+r->resp.vlen_read);
+ }
+ } else {
+ // TODO (v2): no response read?
+ // nothing currently sets res to 0. should remove if that
+ // never comes up and handle the error entirely above.
+ P_DEBUG("%s: no response read from backend\n", __func__);
+ flags = -1;
+ stop = true;
+ break;
+ }
+
+ if (r->resp.type == MCMC_RESP_GET) {
+ // advance the buffer
+ newbuf = mcmc_buffer_consume(be->client, &remain);
+ if (remain != 0) {
+ // TODO (v2): don't need to shuffle buffer with better API
+ memmove(be->rbuf, newbuf, remain);
+ }
+
+ be->state = mcp_backend_read_end;
+ } else {
+ be->state = mcp_backend_next;
+ }
+
+ break;
+ case mcp_backend_read_end:
+ r = p->client_resp;
+ // we need to ensure the next data in the stream is "END\r\n"
+ // if not, the stack is desynced and we lose it.
+
+ r->status = mcmc_parse_buf(be->client, be->rbuf, bread, &tmp_resp);
+ P_DEBUG("%s [read_end]: r->status: %d, bread: %d resp.type:%d\n", __func__, r->status, bread, tmp_resp.type);
+ if (r->status != MCMC_OK) {
+ if (r->status == MCMC_WANT_READ) {
+ *rbuf = mcmc_read_prep(be->client, be->rbuf, READ_BUFFER_SIZE, toread);
+ return EV_READ;
+ } else {
+ flags = -1; // TODO (v2): specific error.
+ stop = true;
+ }
+ break;
+ } else if (tmp_resp.type != MCMC_RESP_END) {
+ // TODO (v2): specific error about protocol desync
+ flags = -1;
+ stop = true;
+ break;
+ } else {
+ // response is good.
+ // FIXME (v2): copy what the server actually sent?
+ if (!p->ascii_multiget) {
+ // sigh... if part of a multiget we need to eat the END
+ // markers down here.
+ memcpy(r->buf+r->blen, ENDSTR, ENDLEN);
+ r->blen += 5;
+ }
+ }
+
+ be->state = mcp_backend_next;
+
+ break;
+ case mcp_backend_want_read:
+ // Continuing a read from earlier
+ r = p->client_resp;
+ // take bread input and see if we're done reading the value,
+ // else advance, set buffers, return next.
+ if (bread > 0) {
+ r->bread += bread;
+ bread = 0;
+ }
+ P_DEBUG("%s: [want_read] r->bread: %d vlen: %lu\n", __func__, r->bread, r->resp.vlen);
+
+ if (r->bread >= r->resp.vlen) {
+ // all done copying data.
+ if (r->resp.type == MCMC_RESP_GET) {
+ newbuf = mcmc_buffer_consume(be->client, &remain);
+ // Shouldn't be anything in the buffer if we had to run to
+ // want_read to read the value.
+ assert(remain == 0);
+ be->state = mcp_backend_read_end;
+ } else {
+ be->state = mcp_backend_next;
+ }
+ } else {
+ // signal to caller to issue a read.
+ *rbuf = r->buf+r->resp.reslen+r->bread;
+ *toread = r->resp.vlen - r->bread;
+ // need to retry later.
+ flags |= EV_READ;
+ stop = true;
+ }
+
+ break;
+ case mcp_backend_next:
+ // set the head here. when we break the head will be correct.
+ STAILQ_REMOVE_HEAD(&be->io_head, io_next);
+ be->depth--;
+ // have to do the q->count-- and == 0 and redispatch_conn()
+ // stuff here. The moment we call return_io here we
+ // don't own *p anymore.
+ return_io_pending((io_pending_t *)p);
+
+ if (STAILQ_EMPTY(&be->io_head)) {
+ // TODO (v2): suspicious of this code. audit harder?
+ stop = true;
+ } else {
+ p = STAILQ_FIRST(&be->io_head);
+ }
+
+ // mcmc_buffer_consume() - if leftover, keep processing
+ // IO's.
+ // if no more data in buffer, need to re-set stack head and re-set
+ // event.
+ remain = 0;
+ // TODO (v2): do we need to yield every N reads?
+ newbuf = mcmc_buffer_consume(be->client, &remain);
+ P_DEBUG("%s: [next] remain: %d\n", __func__, remain);
+ be->state = mcp_backend_read;
+ if (remain != 0) {
+ // data trailing in the buffer, for a different request.
+ memmove(be->rbuf, newbuf, remain);
+ be->state = mcp_backend_parse;
+ P_DEBUG("read buffer remaining: %p %d\n", (void *)be, remain);
+ } else {
+ // need to read more data, buffer is empty.
+ stop = true;
+ }
+
+ break;
+ default:
+ // TODO (v2): at some point (after v1?) this should attempt to recover,
+ // though we should only get here from memory corruption and
+ // bailing may be the right thing to do.
+ fprintf(stderr, "%s: invalid backend state: %d\n", __func__, be->state);
+ assert(false);
+ } // switch
+ } // while
+
+ return flags;
+}
+
+// All we need to do here is schedule the backend to attempt to connect again.
+static void proxy_backend_retry_handler(const int fd, const short which, void *arg) {
+ mcp_backend_t *be = arg;
+ assert(which & EV_TIMEOUT);
+ struct timeval tmp_time = be->event_thread->tunables.retry;
+ _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler);
+}
+
+// currently just for timeouts, but certain errors should consider a backend
+// to be "bad" as well.
+// must be called before _reset_bad_backend(), so the backend is currently
+// clear.
+// TODO (v2): currently only notes for "bad backends" in cases of timeouts or
+// connect failures. We need a specific connect() handler that executes a
+// "version" call to at least check that the backend isn't speaking garbage.
+// In theory backends can fail such that responses are constantly garbage,
+// but it's more likely an app is doing something bad and culling the backend
+// may prevent any other clients from talking to that backend. In
+// that case we need to track if clients are causing errors consistently and
+// block them instead. That's more challenging so leaving a note instead
+// of doing this now :)
+static void _backend_failed(mcp_backend_t *be) {
+ struct timeval tmp_time = be->event_thread->tunables.retry;
+ if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) {
+ P_DEBUG("%s: marking backend as bad\n", __func__);
+ be->bad = true;
+ _set_event(be, be->event_thread->base, EV_TIMEOUT, tmp_time, proxy_backend_retry_handler);
+ STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1);
+ } else {
+ STAT_INCR(be->event_thread->ctx, backend_failed, 1);
+ _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler);
+ }
+}
+
+const char *proxy_be_failure_text[] = {
+ [P_BE_FAIL_TIMEOUT] = "timeout",
+ [P_BE_FAIL_DISCONNECTED] = "disconnected",
+ [P_BE_FAIL_CONNECTING] = "connecting",
+ [P_BE_FAIL_WRITING] = "writing",
+ [P_BE_FAIL_READING] = "reading",
+ [P_BE_FAIL_PARSING] = "parsing",
+ NULL
+};
+
+// TODO (v2): add a second argument for assigning a specific error to all pending
+// IO's (ie; timeout).
+// The backend has gotten into a bad state (timed out, protocol desync, or
+// some other supposedly unrecoverable error: purge the queue and
+// cycle the socket.
+// Note that some types of errors may not require flushing the queue and
+// should be fixed as they're figured out.
+// _must_ be called from within the event thread.
+static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) {
+ io_pending_proxy_t *io = NULL;
+ STAILQ_FOREACH(io, &be->io_head, io_next) {
+ // TODO (v2): Unsure if this is the best way of surfacing errors to lua,
+ // but will do for V1.
+ io->client_resp->status = MCMC_ERR;
+ return_io_pending((io_pending_t *)io);
+ }
+
+ STAILQ_INIT(&be->io_head);
+
+ mcmc_disconnect(be->client);
+ int status = mcmc_connect(be->client, be->ip, be->port, be->connect_flags);
+ if (status == MCMC_CONNECTED) {
+ // TODO (v2): unexpected but lets let it be here.
+ be->connecting = false;
+ be->can_write = true;
+ } else if (status == MCMC_CONNECTING) {
+ be->connecting = true;
+ be->can_write = false;
+ } else {
+ // TODO (v2): failed to immediately re-establish the connection.
+ // need to put the BE into a bad/retry state.
+ // FIXME (v2): until we get an event to specifically handle connecting and
+ // bad server handling, attempt to force a reconnect here the next
+ // time a request comes through.
+ // The event thread will attempt to write to the backend, fail, then
+ // end up in this routine again.
+ be->connecting = false;
+ be->can_write = true;
+ }
+
+ LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_BE_ERROR, NULL, proxy_be_failure_text[err], be->ip, be->port);
+
+ return 0;
+}
+
+static int _prep_pending_write(mcp_backend_t *be, unsigned int *tosend) {
+ struct iovec *iovs = be->write_iovs;
+ io_pending_proxy_t *io = NULL;
+ int iovused = 0;
+ STAILQ_FOREACH(io, &be->io_head, io_next) {
+ if (io->flushed)
+ continue;
+
+ if (io->iovcnt + iovused > BE_IOV_MAX) {
+ // Signal to caller that we need to keep writing, if writeable.
+ // FIXME (v2): can certainly refactor this to loop instead of waiting
+ // for a writeable event.
+ *tosend += 1;
+ break;
+ }
+
+ memcpy(&iovs[iovused], io->iov, sizeof(struct iovec)*io->iovcnt);
+ iovused += io->iovcnt;
+ *tosend += io->iovbytes;
+ }
+ return iovused;
+}
+
+static int _flush_pending_write(mcp_backend_t *be) {
+ int flags = 0;
+ unsigned int tosend = 0;
+ int iovcnt = _prep_pending_write(be, &tosend);
+
+ ssize_t sent = writev(mcmc_fd(be->client), be->write_iovs, iovcnt);
+ if (sent > 0) {
+ io_pending_proxy_t *io = NULL;
+ if (sent < tosend) {
+ flags |= EV_WRITE;
+ }
+
+ STAILQ_FOREACH(io, &be->io_head, io_next) {
+ bool flushed = true;
+ if (io->flushed)
+ continue;
+
+ if (sent >= io->iovbytes) {
+ // short circuit for common case.
+ sent -= io->iovbytes;
+ } else {
+ for (int x = 0; x < io->iovcnt; x++) {
+ struct iovec *iov = &io->iov[x];
+ if (sent >= iov->iov_len) {
+ sent -= iov->iov_len;
+ iov->iov_len = 0;
+ } else {
+ iov->iov_len -= sent;
+ sent = 0;
+ flushed = false;
+ break;
+ }
+ }
+ }
+ io->flushed = flushed;
+
+ if (flushed) {
+ flags |= EV_READ;
+ }
+ if (sent <= 0) {
+ // really shouldn't be negative, though.
+ assert(sent >= 0);
+ break;
+ }
+ } // STAILQ_FOREACH
+ } else if (sent == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ be->can_write = false;
+ flags |= EV_WRITE;
+ } else {
+ flags = -1;
+ }
+ }
+
+ return flags;
+}
+
+// The libevent backend callback handler.
+// If we end up resetting a backend, it will get put back into a connecting
+// state.
+static void proxy_backend_handler(const int fd, const short which, void *arg) {
+ mcp_backend_t *be = arg;
+ int flags = EV_TIMEOUT;
+ struct timeval tmp_time = be->event_thread->tunables.read;
+
+ if (which & EV_TIMEOUT) {
+ P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
+ _reset_bad_backend(be, P_BE_FAIL_TIMEOUT);
+ _backend_failed(be);
+ return;
+ }
+
+ if (which & EV_WRITE) {
+ be->can_write = true;
+ // TODO (v2): move connect routine to its own function?
+ // - hard to do right now because we can't (easily?) edit libevent
+ // events.
+ if (be->connecting) {
+ int err = 0;
+ // We were connecting, now ensure we're properly connected.
+ if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) {
+ // kick the bad backend, clear the queue, retry later.
+ // FIXME (v2): if a connect fails, anything currently in the queue
+ // should be safe to hold up until their timeout.
+ _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
+ _backend_failed(be);
+ P_DEBUG("%s: backend failed to connect\n", __func__);
+ return;
+ }
+ P_DEBUG("%s: backend connected\n", __func__);
+ be->connecting = false;
+ be->state = mcp_backend_read;
+ be->bad = false;
+ be->failed_count = 0;
+ }
+ int res = _flush_pending_write(be);
+ if (res == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ return;
+ }
+ }
+
+ if (which & EV_READ) {
+ // We do the syscall here before diving into the state machine to allow a
+ // common code path for io_uring/epoll
+ int res = 1;
+ int read = 0;
+ while (res > 0) {
+ char *rbuf = NULL;
+ size_t toread = 0;
+ // need to input how much was read since last call
+ // needs _output_ of the buffer to read into and how much.
+ res = proxy_backend_drive_machine(be, read, &rbuf, &toread);
+ P_DEBUG("%s: res: %d toread: %lu\n", __func__, res, toread);
+
+ if (res > 0) {
+ read = recv(mcmc_fd(be->client), rbuf, toread, 0);
+ P_DEBUG("%s: read: %d\n", __func__, read);
+ if (read == 0) {
+ // not connected or error.
+ _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
+ return;
+ } else if (read == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ break; // sit on epoll again.
+ } else {
+ _reset_bad_backend(be, P_BE_FAIL_READING);
+ return;
+ }
+ }
+ } else if (res == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_PARSING);
+ return;
+ } else {
+ break;
+ }
+ }
+
+#ifdef PROXY_DEBUG
+ if (!STAILQ_EMPTY(&be->io_head)) {
+ P_DEBUG("backend has leftover IOs: %d\n", be->depth);
+ }
+#endif
+ }
+
+ // Still pending requests to read or write.
+ if (!STAILQ_EMPTY(&be->io_head)) {
+ flags |= EV_READ; // FIXME (v2): might not be necessary here, but ensures we get a disconnect event.
+ _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler);
+ }
+}
+
+// TODO (v2): IORING_SETUP_ATTACH_WQ port from bench_event once we have multiple
+// event threads.
+void proxy_init_evthread_events(proxy_event_thread_t *t) {
+#ifdef HAVE_LIBURING
+ bool use_uring = t->ctx->use_uring;
+ struct io_uring_params p = {0};
+ assert(t->event_fd); // uring only exists where eventfd also does.
+
+ // Setup the CQSIZE to be much larger than SQ size, since backpressure
+ // issues can cause us to block on SQ submissions and as a network server,
+ // stuff happens.
+
+ if (use_uring) {
+ p.flags = IORING_SETUP_CQSIZE;
+ p.cq_entries = PRING_QUEUE_CQ_ENTRIES;
+ int ret = io_uring_queue_init_params(PRING_QUEUE_SQ_ENTRIES, &t->ring, &p);
+ if (ret) {
+ perror("io_uring_queue_init_params");
+ exit(1);
+ }
+ if (!(p.features & IORING_FEAT_NODROP)) {
+ fprintf(stderr, "uring: kernel missing IORING_FEAT_NODROP, using libevent\n");
+ use_uring = false;
+ }
+ if (!(p.features & IORING_FEAT_SINGLE_MMAP)) {
+ fprintf(stderr, "uring: kernel missing IORING_FEAT_SINGLE_MMAP, using libevent\n");
+ use_uring = false;
+ }
+ if (!(p.features & IORING_FEAT_FAST_POLL)) {
+ fprintf(stderr, "uring: kernel missing IORING_FEAT_FAST_POLL, using libevent\n");
+ use_uring = false;
+ }
+
+ if (use_uring) {
+ // FIXME (v2): Sigh. we need a blocking event_fd for io_uring but we've a
+ // chicken and egg in here. need a better structure... in meantime
+ // re-create the event_fd.
+ close(t->event_fd);
+ t->event_fd = eventfd(0, 0);
+ // FIXME (v2): hack for event init.
+ t->ur_notify_event.set = false;
+ _proxy_evthr_evset_notifier(t);
+
+ // periodic data updater for event thread
+ t->ur_clock_event.cb = proxy_event_updater_ur;
+ t->ur_clock_event.udata = t;
+ t->ur_clock_event.set = false;
+ _proxy_evthr_evset_clock(t);
+
+ t->use_uring = true;
+ return;
+ } else {
+ // Decided to not use io_uring, so don't waste memory.
+ t->use_uring = false;
+ io_uring_queue_exit(&t->ring);
+ }
+ } else {
+ t->use_uring = false;
+ }
+#endif
+
+ struct event_config *ev_config;
+ ev_config = event_config_new();
+ event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
+ t->base = event_base_new_with_config(ev_config);
+ event_config_free(ev_config);
+ if (! t->base) {
+ fprintf(stderr, "Can't allocate event base\n");
+ exit(1);
+ }
+
+ // listen for notifications.
+ // NULL was thread_libevent_process
+ // FIXME (v2): use modern format? (event_assign)
+#ifdef USE_EVENTFD
+ event_set(&t->notify_event, t->event_fd,
+ EV_READ | EV_PERSIST, proxy_event_handler, t);
+#else
+ event_set(&t->notify_event, t->notify_receive_fd,
+ EV_READ | EV_PERSIST, proxy_event_handler, t);
+#endif
+
+ evtimer_set(&t->clock_event, proxy_event_updater, t);
+ event_base_set(t->base, &t->clock_event);
+ struct timeval rate = {.tv_sec = 3, .tv_usec = 0};
+ evtimer_add(&t->clock_event, &rate);
+
+ event_base_set(t->base, &t->notify_event);
+ if (event_add(&t->notify_event, 0) == -1) {
+ fprintf(stderr, "Can't monitor libevent notify pipe\n");
+ exit(1);
+ }
+
+}
+
+