summaryrefslogtreecommitdiff
path: root/proxy_network.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-02-18 15:19:09 -0800
committerdormando <dormando@rydia.net>2022-02-18 16:13:52 -0800
commit34e0359d4de223d8cde4166f7d10ae352d7ebfdf (patch)
tree041a57edfb4bb3b58aa23498681295cb71789ee5 /proxy_network.c
parentd85c379d74d92f8e9bd7ccf1ca57520f485a24f0 (diff)
downloadmemcached-34e0359d4de223d8cde4166f7d10ae352d7ebfdf.tar.gz
proxy: pull chunks into individual c files
now's a good time to at least shove functional subsections of code into their own files. Some further work to clearly separate the API's will help but looks not too terrible. Big bonus is getting the backend handling code away from the frontend handling code, which should make it easier to follow.
Diffstat (limited to 'proxy_network.c')
-rw-r--r--proxy_network.c1120
1 files changed, 1120 insertions, 0 deletions
diff --git a/proxy_network.c b/proxy_network.c
new file mode 100644
index 0000000..73735a0
--- /dev/null
+++ b/proxy_network.c
@@ -0,0 +1,1120 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// Functions related to the backend handler thread.
+
+#include "proxy.h"
+
+static void proxy_backend_handler(const int fd, const short which, void *arg);
+static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
+static void proxy_event_updater(evutil_socket_t fd, short which, void *arg);
+static int _flush_pending_write(mcp_backend_t *be);
+static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err);
+static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback);
+static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf, size_t *toread);
+
+static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
+ io_head_t head;
+
+ STAILQ_INIT(&head);
+ STAILQ_INIT(&t->be_head);
+
+ // Pull the entire stack of inbound into local queue.
+ pthread_mutex_lock(&t->mutex);
+ STAILQ_CONCAT(&head, &t->io_head_in);
+ pthread_mutex_unlock(&t->mutex);
+
+ int io_count = 0;
+ int be_count = 0;
+ while (!STAILQ_EMPTY(&head)) {
+ io_pending_proxy_t *io = STAILQ_FIRST(&head);
+ io->flushed = false;
+ mcp_backend_t *be = io->backend;
+ // So the backend can retrieve its event base.
+ be->event_thread = t;
+
+ // _no_ mutex on backends. they are owned by the event thread.
+ STAILQ_REMOVE_HEAD(&head, io_next);
+ if (be->bad) {
+ P_DEBUG("%s: fast failing request to bad backend\n", __func__);
+ io->client_resp->status = MCMC_ERR;
+ return_io_pending((io_pending_t *)io);
+ continue;
+ }
+ STAILQ_INSERT_TAIL(&be->io_head, io, io_next);
+ be->depth++;
+ io_count++;
+ if (!be->stacked) {
+ be->stacked = true;
+ STAILQ_INSERT_TAIL(&t->be_head, be, be_next);
+ be_count++;
+ }
+ }
+ //P_DEBUG("%s: io/be counts for syscalls [%d/%d]\n", __func__, io_count, be_count);
+ return io_count;
+}
+
+#ifdef HAVE_LIBURING
+static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts);
+static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts);
+static void _proxy_evthr_evset_be_retry(mcp_backend_t *be);
+
+static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe) {
+ proxy_event_thread_t *t = udata;
+ proxy_ctx_t *ctx = t->ctx;
+
+ _proxy_evthr_evset_clock(t);
+
+ // we reuse the "global stats" lock since it's hardly ever used.
+ STAT_L(ctx);
+ memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables));
+ STAT_UL(ctx);
+}
+
+// No-op at the moment. when the linked timeout fires uring returns the
+// linked request (read/write/poll/etc) with an interrupted/timeout/cancelled
+// error. So we don't need to explicitly handle timeouts.
+// I'm leaving the structure in to simplify the callback routine.
+// Since timeouts rarely get called the extra code here shouldn't matter.
+static void proxy_backend_timeout_handler_ur(void *udata, struct io_uring_cqe *cqe) {
+ return;
+}
+
+static void proxy_backend_retry_handler_ur(void *udata, struct io_uring_cqe *cqe) {
+ mcp_backend_t *be = udata;
+ _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.connect_ur);
+}
+
+static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) {
+ struct io_uring_sqe *sqe;
+ if (be->ur_te_ev.set)
+ return;
+
+ be->ur_te_ev.cb = proxy_backend_retry_handler_ur;
+ be->ur_te_ev.udata = be;
+
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+ // TODO (v2): NULL?
+
+ io_uring_prep_timeout(sqe, &be->event_thread->tunables.retry_ur, 0, 0);
+ io_uring_sqe_set_data(sqe, &be->ur_te_ev);
+ be->ur_te_ev.set = true;
+}
+
+static void _backend_failed_ur(mcp_backend_t *be) {
+ if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) {
+ P_DEBUG("%s: marking backend as bad\n", __func__);
+ be->bad = true;
+ _proxy_evthr_evset_be_retry(be);
+ } else {
+ _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.retry_ur);
+ }
+}
+
+// read handler.
+static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) {
+ mcp_backend_t *be = udata;
+ int bread = cqe->res;
+ char *rbuf = NULL;
+ size_t toread = 0;
+ // Error or disconnection.
+ if (bread <= 0) {
+ _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
+ // NOTE: Not calling backed_failed here since if the backend is busted
+ // it should be caught by the connect routine.
+ // This is probably not _always_ true in practice. Leaving this note
+ // so I can re-evaluate later.
+ return;
+ }
+
+ int res = proxy_backend_drive_machine(be, bread, &rbuf, &toread);
+ P_DEBUG("%s: bread: %d res: %d toread: %lu\n", __func__, bread, res, toread);
+
+ if (res > 0) {
+ _proxy_evthr_evset_be_read(be, rbuf, toread, &be->event_thread->tunables.read_ur);
+ } else if (res == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
+ return;
+ }
+
+ // TODO (v2): when exactly do we need to reset the backend handler?
+ if (!STAILQ_EMPTY(&be->io_head)) {
+ _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur);
+ }
+}
+
+static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
+ mcp_backend_t *be = udata;
+ int flags = 0;
+
+ be->can_write = true;
+ if (be->connecting) {
+ int err = 0;
+ // We were connecting, now ensure we're properly connected.
+ if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) {
+ // kick the bad backend, clear the queue, retry later.
+ // TODO (v2): if a connect fails, anything currently in the queue
+ // should be safe to hold up until their timeout.
+ _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
+ _backend_failed_ur(be);
+ P_DEBUG("%s: backend failed to connect\n", __func__);
+ return;
+ }
+ P_DEBUG("%s: backend connected\n", __func__);
+ be->connecting = false;
+ be->state = mcp_backend_read;
+ be->bad = false;
+ be->failed_count = 0;
+ }
+ int res = _flush_pending_write(be);
+ if (res == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ return;
+ }
+
+ if (flags & EV_WRITE) {
+ _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.connect_ur);
+ }
+
+ _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur);
+}
+
+static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) {
+ proxy_event_thread_t *t = udata;
+
+ // liburing always uses eventfd for the notifier.
+ // *cqe has our result.
+ assert(cqe->res != -EINVAL);
+ if (cqe->res != sizeof(eventfd_t)) {
+ P_DEBUG("%s: cqe->res: %d\n", __func__, cqe->res);
+ // FIXME (v2): figure out if this is impossible, and how to handle if not.
+ assert(1 == 0);
+ }
+
+ // need to re-arm the listener every time.
+ _proxy_evthr_evset_notifier(t);
+
+ // TODO (v2): sqe queues for writing to backends
+ // - _ur handler for backend write completion is to set a read event and
+ // re-submit. ugh.
+ // Should be possible to have standing reads, but flow is harder and lets
+ // optimize that later. (ie; allow matching reads to a request but don't
+ // actually dequeue anything until both read and write are confirmed)
+ if (_proxy_event_handler_dequeue(t) == 0) {
+ //P_DEBUG("%s: no IO's to complete\n", __func__);
+ return;
+ }
+
+ // Re-walk each backend and check set event as required.
+ mcp_backend_t *be = NULL;
+
+ // TODO (v2): for each backend, queue writev's into sqe's
+ // move the backend sqe bits into a write complete handler
+ STAILQ_FOREACH(be, &t->be_head, be_next) {
+ be->stacked = false;
+ int flags = 0;
+
+ if (be->connecting) {
+ P_DEBUG("%s: deferring IO pending connecting\n", __func__);
+ flags |= EV_WRITE;
+ } else {
+ flags = _flush_pending_write(be);
+ }
+
+ if (flags == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ } else {
+ // FIXME (v2): needs a re-write to handle sqe starvation.
+ // FIXME (v2): can't actually set the read here? need to confirm _some_
+ // write first?
+ if (flags & EV_WRITE) {
+ _proxy_evthr_evset_be_wrpoll(be, &t->tunables.connect_ur);
+ }
+ if (flags & EV_READ) {
+ _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &t->tunables.read_ur);
+ }
+ }
+ }
+}
+
+static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts) {
+ struct io_uring_sqe *sqe;
+ if (be->ur_wr_ev.set)
+ return;
+
+ be->ur_wr_ev.cb = proxy_backend_wrhandler_ur;
+ be->ur_wr_ev.udata = be;
+
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+ // FIXME (v2): NULL?
+
+ io_uring_prep_poll_add(sqe, mcmc_fd(be->client), POLLOUT);
+ io_uring_sqe_set_data(sqe, &be->ur_wr_ev);
+ be->ur_wr_ev.set = true;
+
+ sqe->flags |= IOSQE_IO_LINK;
+
+ // add a timeout.
+ be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
+ be->ur_te_ev.udata = be;
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+
+ io_uring_prep_link_timeout(sqe, ts, 0);
+ io_uring_sqe_set_data(sqe, &be->ur_te_ev);
+}
+
+static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts) {
+ P_DEBUG("%s: setting: %lu\n", __func__, len);
+ struct io_uring_sqe *sqe;
+ if (be->ur_rd_ev.set) {
+ P_DEBUG("%s: already set\n", __func__);
+ return;
+ }
+
+ be->ur_rd_ev.cb = proxy_backend_handler_ur;
+ be->ur_rd_ev.udata = be;
+
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+ // FIXME (v2): NULL?
+ assert(be->rbuf != NULL);
+ io_uring_prep_recv(sqe, mcmc_fd(be->client), buf, len, 0);
+ io_uring_sqe_set_data(sqe, &be->ur_rd_ev);
+ be->ur_rd_ev.set = true;
+
+ sqe->flags |= IOSQE_IO_LINK;
+
+ // add a timeout.
+ // TODO (v2): we can pre-set the event data and avoid always re-doing it here.
+ be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
+ be->ur_te_ev.udata = be;
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+
+ io_uring_prep_link_timeout(sqe, ts, 0);
+ io_uring_sqe_set_data(sqe, &be->ur_te_ev);
+}
+
+static void _proxy_evthr_evset_clock(proxy_event_thread_t *t) {
+ struct io_uring_sqe *sqe;
+
+ sqe = io_uring_get_sqe(&t->ring);
+ // FIXME (v2): NULL?
+
+ io_uring_prep_timeout(sqe, &updater_ts, 0, 0);
+ io_uring_sqe_set_data(sqe, &t->ur_clock_event);
+ t->ur_clock_event.set = true;
+}
+
+static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t) {
+ struct io_uring_sqe *sqe;
+ P_DEBUG("%s: setting: %d\n", __func__, t->ur_notify_event.set);
+ if (t->ur_notify_event.set)
+ return;
+
+ t->ur_notify_event.cb = proxy_event_handler_ur;
+ t->ur_notify_event.udata = t;
+
+ sqe = io_uring_get_sqe(&t->ring);
+ // FIXME (v2): NULL?
+ io_uring_prep_read(sqe, t->event_fd, &t->event_counter, sizeof(eventfd_t), 0);
+ io_uring_sqe_set_data(sqe, &t->ur_notify_event);
+}
+
+// TODO (v2): CQE's can generate many SQE's, so we might need to occasionally check
+// for space free in the sqe queue and submit in the middle of the cqe
+// foreach.
+// There might be better places to do this, but I think it's cleaner if
+// submission and cqe can stay in this function.
+// TODO (v2): The problem is io_submit() can deadlock if too many cqe's are
+// waiting.
+// Need to understand if this means "CQE's ready to be picked up" or "CQE's in
+// flight", because the former is much easier to work around (ie; only run the
+// backend handler after dequeuing everything else)
+// TODO (v2): IOURING_FEAT_NODROP: uring_submit() should return -EBUSY if out of CQ
+// events slots. Therefore might starve SQE's if we were low beforehand.
+// - switch from for_each_cqe to doing one at a time (for now?)
+// - track # of sqe's allocated in the cqe loop.
+// - stop and submit if we've >= half the queue.
+// - ??? when can a CQE generate > 1 SQE?
+// - wrhandler_ur can set both wrpoll and read
+// - if CQE's can gen > 1 SQE at a time, we'll eventually starve.
+// - proper flow: CQE's can enqueue backends to be processed.
+// - after CQE's are processed, backends are processed (ouch?)
+// - if SQE's starve here, bail but keep the BE queue.
+// - then submit SQE's
+static void *proxy_event_thread_ur(void *arg) {
+ proxy_event_thread_t *t = arg;
+ struct io_uring_cqe *cqe;
+
+ P_DEBUG("%s: starting\n", __func__);
+
+ logger_create(); // TODO (v2): add logger to struct
+ while (1) {
+ P_DEBUG("%s: submit and wait\n", __func__);
+ io_uring_submit_and_wait(&t->ring, 1);
+ //P_DEBUG("%s: sqe submitted: %d\n", __func__, ret);
+
+ uint32_t head = 0;
+ uint32_t count = 0;
+
+ io_uring_for_each_cqe(&t->ring, head, cqe) {
+ P_DEBUG("%s: got a CQE [count:%d]\n", __func__, count);
+
+ proxy_event_t *pe = io_uring_cqe_get_data(cqe);
+ pe->set = false;
+ pe->cb(pe->udata, cqe);
+
+ count++;
+ }
+
+ P_DEBUG("%s: advancing [count:%d]\n", __func__, count);
+ io_uring_cq_advance(&t->ring, count);
+ }
+
+ return NULL;
+}
+#endif // HAVE_LIBURING
+
+// We need to get timeout/retry/etc updates to the event thread(s)
+// occasionally. I'd like to have a better inteface around this where updates
+// are shipped directly; but this is good enough to start with.
+static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) {
+ proxy_event_thread_t *t = arg;
+ proxy_ctx_t *ctx = t->ctx;
+
+ // TODO (v2): double check how much of this boilerplate is still necessary?
+ // reschedule the clock event.
+ evtimer_del(&t->clock_event);
+
+ evtimer_set(&t->clock_event, proxy_event_updater, t);
+ event_base_set(t->base, &t->clock_event);
+ struct timeval rate = {.tv_sec = 3, .tv_usec = 0};
+ evtimer_add(&t->clock_event, &rate);
+
+ // we reuse the "global stats" lock since it's hardly ever used.
+ STAT_L(ctx);
+ memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables));
+ STAT_UL(ctx);
+}
+
+// event handler for executing backend requests
+static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
+ proxy_event_thread_t *t = arg;
+
+#ifdef USE_EVENTFD
+ uint64_t u;
+ if (read(fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {
+ // Temporary error or wasn't actually ready to read somehow.
+ return;
+ }
+#else
+ char buf[1];
+ // TODO (v2): This is a lot more fatal than it should be. can it fail? can
+ // it blow up the server?
+ // TODO (v2): a cross-platform method of speeding this up would be nice. With
+ // event fds we can queue N events and wakeup once here.
+ // If we're pulling one byte out of the pipe at a time here it'll just
+ // wake us up too often.
+ // If the pipe is O_NONBLOCK then maybe just a larger read would work?
+ if (read(fd, buf, 1) != 1) {
+ P_DEBUG("%s: pipe read failed\n", __func__);
+ return;
+ }
+#endif
+
+ if (_proxy_event_handler_dequeue(t) == 0) {
+ //P_DEBUG("%s: no IO's to complete\n", __func__);
+ return;
+ }
+
+ // Re-walk each backend and check set event as required.
+ mcp_backend_t *be = NULL;
+ struct timeval tmp_time = t->tunables.read;
+
+ // FIXME (v2): _set_event() is buggy, see notes on function.
+ STAILQ_FOREACH(be, &t->be_head, be_next) {
+ be->stacked = false;
+ int flags = 0;
+
+ if (be->connecting) {
+ P_DEBUG("%s: deferring IO pending connecting\n", __func__);
+ } else {
+ flags = _flush_pending_write(be);
+ }
+
+ if (flags == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ } else {
+ flags = be->can_write ? EV_READ|EV_TIMEOUT : EV_READ|EV_WRITE|EV_TIMEOUT;
+ _set_event(be, t->base, flags, tmp_time, proxy_backend_handler);
+ }
+ }
+
+}
+
+void *proxy_event_thread(void *arg) {
+ proxy_event_thread_t *t = arg;
+
+ logger_create(); // TODO (v2): add logger ptr to structure
+ event_base_loop(t->base, 0);
+ event_base_free(t->base);
+
+ // TODO (v2): join bt threads, free array.
+
+ return NULL;
+}
+
+// FIXME (v2): if we use the newer API the various pending checks can be adjusted.
+static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback) {
+ // FIXME (v2): chicken and egg.
+ // can't check if pending if the structure is was calloc'ed (sigh)
+ // don't want to double test here. should be able to event_assign but
+ // not add anything during initialization, but need the owner thread's
+ // event base.
+ int pending = 0;
+ if (event_initialized(&be->event)) {
+ pending = event_pending(&be->event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL);
+ }
+ if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) {
+ event_del(&be->event); // replace existing event.
+ }
+
+ // if we can't write, we could be connecting.
+ // TODO (v2): always check for READ in case some commands were sent
+ // successfully? The flags could be tracked on *be and reset in the
+ // handler, perhaps?
+ event_assign(&be->event, base, mcmc_fd(be->client),
+ flags, callback, be);
+ event_add(&be->event, &t);
+}
+
+// NOTES:
+// - mcp_backend_read: grab req_stack_head, do things
+// read -> next, want_read -> next | read_end, etc.
+// issue: want read back to read_end as necessary. special state?
+// - it's fine: p->client_resp->type.
+// - mcp_backend_next: advance, consume, etc.
+// TODO (v2): second argument with enum for a specific error.
+// - probably just for logging. for app if any of these errors shouldn't
+// result in killing the request stack!
+static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf, size_t *toread) {
+ bool stop = false;
+ io_pending_proxy_t *p = NULL;
+ mcmc_resp_t tmp_resp; // helper for testing for GET's END marker.
+ int flags = 0;
+
+ p = STAILQ_FIRST(&be->io_head);
+ if (p == NULL) {
+ // got a read event, but nothing was queued.
+ // probably means a disconnect event.
+ // TODO (v2): could probably confirm this by attempting to read the
+ // socket, getsockopt, or something else simply for logging or
+ // statistical purposes.
+ // In this case we know it's going to be a close so error.
+ flags = -1;
+ P_DEBUG("%s: read event but nothing in IO queue\n", __func__);
+ return flags;
+ }
+
+ while (!stop) {
+ mcp_resp_t *r;
+ int res = 1;
+ int remain = 0;
+ char *newbuf = NULL;
+
+ switch(be->state) {
+ case mcp_backend_read:
+ assert(p != NULL);
+ P_DEBUG("%s: [read] bread: %d\n", __func__, bread);
+
+ if (bread == 0) {
+ // haven't actually done a read yet; figure out where/what.
+ *rbuf = mcmc_read_prep(be->client, be->rbuf, READ_BUFFER_SIZE, toread);
+ return EV_READ;
+ } else {
+ be->state = mcp_backend_parse;
+ }
+ break;
+ case mcp_backend_parse:
+ r = p->client_resp;
+ r->status = mcmc_parse_buf(be->client, be->rbuf, bread, &r->resp);
+ // FIXME (v2): Don't like how this mcmc API ended up.
+ bread = 0; // only add the bread once per loop.
+ if (r->status != MCMC_OK) {
+ P_DEBUG("%s: mcmc_read failed [%d]\n", __func__, r->status);
+ if (r->status == MCMC_WANT_READ) {
+ flags |= EV_READ;
+ be->state = mcp_backend_read;
+ stop = true;
+ break;
+ } else {
+ flags = -1;
+ stop = true;
+ break;
+ }
+ }
+
+ // we actually don't care about anything but the value length
+ // TODO (v2): if vlen != vlen_read, pull an item and copy the data.
+ int extra_space = 0;
+ switch (r->resp.type) {
+ case MCMC_RESP_GET:
+ // We're in GET mode. we only support one key per
+ // GET in the proxy backends, so we need to later check
+ // for an END.
+ extra_space = ENDLEN;
+ break;
+ case MCMC_RESP_END:
+ // this is a MISS from a GET request
+ // or final handler from a STAT request.
+ assert(r->resp.vlen == 0);
+ break;
+ case MCMC_RESP_META:
+ // we can handle meta responses easily since they're self
+ // contained.
+ break;
+ case MCMC_RESP_GENERIC:
+ case MCMC_RESP_NUMERIC:
+ break;
+ // TODO (v2): No-op response?
+ default:
+ P_DEBUG("%s: Unhandled response from backend: %d\n", __func__, r->resp.type);
+ // unhandled :(
+ flags = -1;
+ stop = true;
+ break;
+ }
+
+ if (res) {
+ if (p->ascii_multiget && r->resp.type == MCMC_RESP_END) {
+ // Ascii multiget hack mode; consume END's
+ be->state = mcp_backend_next;
+ break;
+ }
+
+ // r->resp.reslen + r->resp.vlen is the total length of the response.
+ // TODO (v2): need to associate a buffer with this response...
+ // for now lets abuse write_and_free on mc_resp and simply malloc the
+ // space we need, stuffing it into the resp object.
+
+ r->blen = r->resp.reslen + r->resp.vlen;
+ r->buf = malloc(r->blen + extra_space);
+ if (r->buf == NULL) {
+ flags = -1; // TODO (v2): specific error.
+ stop = true;
+ break;
+ }
+
+ P_DEBUG("%s: r->status: %d, r->bread: %d, r->vlen: %lu\n", __func__, r->status, r->bread, r->resp.vlen);
+ if (r->resp.vlen != r->resp.vlen_read) {
+ P_DEBUG("%s: got a short read, moving to want_read\n", __func__);
+ // copy the partial and advance mcmc's buffer digestion.
+ // FIXME (v2): should call this for both cases?
+ // special function for advancing mcmc's buffer for
+ // reading a value? perhaps a flag to skip the data copy
+ // when it's unnecessary?
+ memcpy(r->buf, be->rbuf, r->resp.reslen);
+ r->status = mcmc_read_value_buf(be->client, r->buf+r->resp.reslen, r->resp.vlen, &r->bread);
+ be->state = mcp_backend_want_read;
+ break;
+ } else {
+ // mcmc's already counted the value as read if it fit in
+ // the original buffer...
+ memcpy(r->buf, be->rbuf, r->resp.reslen+r->resp.vlen_read);
+ }
+ } else {
+ // TODO (v2): no response read?
+ // nothing currently sets res to 0. should remove if that
+ // never comes up and handle the error entirely above.
+ P_DEBUG("%s: no response read from backend\n", __func__);
+ flags = -1;
+ stop = true;
+ break;
+ }
+
+ if (r->resp.type == MCMC_RESP_GET) {
+ // advance the buffer
+ newbuf = mcmc_buffer_consume(be->client, &remain);
+ if (remain != 0) {
+ // TODO (v2): don't need to shuffle buffer with better API
+ memmove(be->rbuf, newbuf, remain);
+ }
+
+ be->state = mcp_backend_read_end;
+ } else {
+ be->state = mcp_backend_next;
+ }
+
+ break;
+ case mcp_backend_read_end:
+ r = p->client_resp;
+ // we need to ensure the next data in the stream is "END\r\n"
+ // if not, the stack is desynced and we lose it.
+
+ r->status = mcmc_parse_buf(be->client, be->rbuf, bread, &tmp_resp);
+ P_DEBUG("%s [read_end]: r->status: %d, bread: %d resp.type:%d\n", __func__, r->status, bread, tmp_resp.type);
+ if (r->status != MCMC_OK) {
+ if (r->status == MCMC_WANT_READ) {
+ *rbuf = mcmc_read_prep(be->client, be->rbuf, READ_BUFFER_SIZE, toread);
+ return EV_READ;
+ } else {
+ flags = -1; // TODO (v2): specific error.
+ stop = true;
+ }
+ break;
+ } else if (tmp_resp.type != MCMC_RESP_END) {
+ // TODO (v2): specific error about protocol desync
+ flags = -1;
+ stop = true;
+ break;
+ } else {
+ // response is good.
+ // FIXME (v2): copy what the server actually sent?
+ if (!p->ascii_multiget) {
+ // sigh... if part of a multiget we need to eat the END
+ // markers down here.
+ memcpy(r->buf+r->blen, ENDSTR, ENDLEN);
+ r->blen += 5;
+ }
+ }
+
+ be->state = mcp_backend_next;
+
+ break;
+ case mcp_backend_want_read:
+ // Continuing a read from earlier
+ r = p->client_resp;
+ // take bread input and see if we're done reading the value,
+ // else advance, set buffers, return next.
+ if (bread > 0) {
+ r->bread += bread;
+ bread = 0;
+ }
+ P_DEBUG("%s: [want_read] r->bread: %d vlen: %lu\n", __func__, r->bread, r->resp.vlen);
+
+ if (r->bread >= r->resp.vlen) {
+ // all done copying data.
+ if (r->resp.type == MCMC_RESP_GET) {
+ newbuf = mcmc_buffer_consume(be->client, &remain);
+ // Shouldn't be anything in the buffer if we had to run to
+ // want_read to read the value.
+ assert(remain == 0);
+ be->state = mcp_backend_read_end;
+ } else {
+ be->state = mcp_backend_next;
+ }
+ } else {
+ // signal to caller to issue a read.
+ *rbuf = r->buf+r->resp.reslen+r->bread;
+ *toread = r->resp.vlen - r->bread;
+ // need to retry later.
+ flags |= EV_READ;
+ stop = true;
+ }
+
+ break;
+ case mcp_backend_next:
+ // set the head here. when we break the head will be correct.
+ STAILQ_REMOVE_HEAD(&be->io_head, io_next);
+ be->depth--;
+ // have to do the q->count-- and == 0 and redispatch_conn()
+ // stuff here. The moment we call return_io here we
+ // don't own *p anymore.
+ return_io_pending((io_pending_t *)p);
+
+ if (STAILQ_EMPTY(&be->io_head)) {
+ // TODO (v2): suspicious of this code. audit harder?
+ stop = true;
+ } else {
+ p = STAILQ_FIRST(&be->io_head);
+ }
+
+ // mcmc_buffer_consume() - if leftover, keep processing
+ // IO's.
+ // if no more data in buffer, need to re-set stack head and re-set
+ // event.
+ remain = 0;
+ // TODO (v2): do we need to yield every N reads?
+ newbuf = mcmc_buffer_consume(be->client, &remain);
+ P_DEBUG("%s: [next] remain: %d\n", __func__, remain);
+ be->state = mcp_backend_read;
+ if (remain != 0) {
+ // data trailing in the buffer, for a different request.
+ memmove(be->rbuf, newbuf, remain);
+ be->state = mcp_backend_parse;
+ P_DEBUG("read buffer remaining: %p %d\n", (void *)be, remain);
+ } else {
+ // need to read more data, buffer is empty.
+ stop = true;
+ }
+
+ break;
+ default:
+ // TODO (v2): at some point (after v1?) this should attempt to recover,
+ // though we should only get here from memory corruption and
+ // bailing may be the right thing to do.
+ fprintf(stderr, "%s: invalid backend state: %d\n", __func__, be->state);
+ assert(false);
+ } // switch
+ } // while
+
+ return flags;
+}
+
+// All we need to do here is schedule the backend to attempt to connect again.
+static void proxy_backend_retry_handler(const int fd, const short which, void *arg) {
+ mcp_backend_t *be = arg;
+ assert(which & EV_TIMEOUT);
+ struct timeval tmp_time = be->event_thread->tunables.retry;
+ _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler);
+}
+
+// currently just for timeouts, but certain errors should consider a backend
+// to be "bad" as well.
+// must be called before _reset_bad_backend(), so the backend is currently
+// clear.
+// TODO (v2): currently only notes for "bad backends" in cases of timeouts or
+// connect failures. We need a specific connect() handler that executes a
+// "version" call to at least check that the backend isn't speaking garbage.
+// In theory backends can fail such that responses are constantly garbage,
+// but it's more likely an app is doing something bad and culling the backend
+// may prevent any other clients from talking to that backend. In
+// that case we need to track if clients are causing errors consistently and
+// block them instead. That's more challenging so leaving a note instead
+// of doing this now :)
+static void _backend_failed(mcp_backend_t *be) {
+ struct timeval tmp_time = be->event_thread->tunables.retry;
+ if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) {
+ P_DEBUG("%s: marking backend as bad\n", __func__);
+ be->bad = true;
+ _set_event(be, be->event_thread->base, EV_TIMEOUT, tmp_time, proxy_backend_retry_handler);
+ STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1);
+ } else {
+ STAT_INCR(be->event_thread->ctx, backend_failed, 1);
+ _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler);
+ }
+}
+
+const char *proxy_be_failure_text[] = {
+ [P_BE_FAIL_TIMEOUT] = "timeout",
+ [P_BE_FAIL_DISCONNECTED] = "disconnected",
+ [P_BE_FAIL_CONNECTING] = "connecting",
+ [P_BE_FAIL_WRITING] = "writing",
+ [P_BE_FAIL_READING] = "reading",
+ [P_BE_FAIL_PARSING] = "parsing",
+ NULL
+};
+
+// TODO (v2): add a second argument for assigning a specific error to all pending
+// IO's (ie; timeout).
+// The backend has gotten into a bad state (timed out, protocol desync, or
+// some other supposedly unrecoverable error: purge the queue and
+// cycle the socket.
+// Note that some types of errors may not require flushing the queue and
+// should be fixed as they're figured out.
+// _must_ be called from within the event thread.
+static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) {
+ io_pending_proxy_t *io = NULL;
+ STAILQ_FOREACH(io, &be->io_head, io_next) {
+ // TODO (v2): Unsure if this is the best way of surfacing errors to lua,
+ // but will do for V1.
+ io->client_resp->status = MCMC_ERR;
+ return_io_pending((io_pending_t *)io);
+ }
+
+ STAILQ_INIT(&be->io_head);
+
+ mcmc_disconnect(be->client);
+ int status = mcmc_connect(be->client, be->ip, be->port, be->connect_flags);
+ if (status == MCMC_CONNECTED) {
+ // TODO (v2): unexpected but lets let it be here.
+ be->connecting = false;
+ be->can_write = true;
+ } else if (status == MCMC_CONNECTING) {
+ be->connecting = true;
+ be->can_write = false;
+ } else {
+ // TODO (v2): failed to immediately re-establish the connection.
+ // need to put the BE into a bad/retry state.
+ // FIXME (v2): until we get an event to specifically handle connecting and
+ // bad server handling, attempt to force a reconnect here the next
+ // time a request comes through.
+ // The event thread will attempt to write to the backend, fail, then
+ // end up in this routine again.
+ be->connecting = false;
+ be->can_write = true;
+ }
+
+ LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_BE_ERROR, NULL, proxy_be_failure_text[err], be->ip, be->port);
+
+ return 0;
+}
+
+static int _prep_pending_write(mcp_backend_t *be, unsigned int *tosend) {
+ struct iovec *iovs = be->write_iovs;
+ io_pending_proxy_t *io = NULL;
+ int iovused = 0;
+ STAILQ_FOREACH(io, &be->io_head, io_next) {
+ if (io->flushed)
+ continue;
+
+ if (io->iovcnt + iovused > BE_IOV_MAX) {
+ // Signal to caller that we need to keep writing, if writeable.
+ // FIXME (v2): can certainly refactor this to loop instead of waiting
+ // for a writeable event.
+ *tosend += 1;
+ break;
+ }
+
+ memcpy(&iovs[iovused], io->iov, sizeof(struct iovec)*io->iovcnt);
+ iovused += io->iovcnt;
+ *tosend += io->iovbytes;
+ }
+ return iovused;
+}
+
+static int _flush_pending_write(mcp_backend_t *be) {
+ int flags = 0;
+ unsigned int tosend = 0;
+ int iovcnt = _prep_pending_write(be, &tosend);
+
+ ssize_t sent = writev(mcmc_fd(be->client), be->write_iovs, iovcnt);
+ if (sent > 0) {
+ io_pending_proxy_t *io = NULL;
+ if (sent < tosend) {
+ flags |= EV_WRITE;
+ }
+
+ STAILQ_FOREACH(io, &be->io_head, io_next) {
+ bool flushed = true;
+ if (io->flushed)
+ continue;
+
+ if (sent >= io->iovbytes) {
+ // short circuit for common case.
+ sent -= io->iovbytes;
+ } else {
+ for (int x = 0; x < io->iovcnt; x++) {
+ struct iovec *iov = &io->iov[x];
+ if (sent >= iov->iov_len) {
+ sent -= iov->iov_len;
+ iov->iov_len = 0;
+ } else {
+ iov->iov_len -= sent;
+ sent = 0;
+ flushed = false;
+ break;
+ }
+ }
+ }
+ io->flushed = flushed;
+
+ if (flushed) {
+ flags |= EV_READ;
+ }
+ if (sent <= 0) {
+ // really shouldn't be negative, though.
+ assert(sent >= 0);
+ break;
+ }
+ } // STAILQ_FOREACH
+ } else if (sent == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ be->can_write = false;
+ flags |= EV_WRITE;
+ } else {
+ flags = -1;
+ }
+ }
+
+ return flags;
+}
+
+// The libevent backend callback handler.
+// If we end up resetting a backend, it will get put back into a connecting
+// state.
+static void proxy_backend_handler(const int fd, const short which, void *arg) {
+ mcp_backend_t *be = arg;
+ int flags = EV_TIMEOUT;
+ struct timeval tmp_time = be->event_thread->tunables.read;
+
+ if (which & EV_TIMEOUT) {
+ P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
+ _reset_bad_backend(be, P_BE_FAIL_TIMEOUT);
+ _backend_failed(be);
+ return;
+ }
+
+ if (which & EV_WRITE) {
+ be->can_write = true;
+ // TODO (v2): move connect routine to its own function?
+ // - hard to do right now because we can't (easily?) edit libevent
+ // events.
+ if (be->connecting) {
+ int err = 0;
+ // We were connecting, now ensure we're properly connected.
+ if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) {
+ // kick the bad backend, clear the queue, retry later.
+ // FIXME (v2): if a connect fails, anything currently in the queue
+ // should be safe to hold up until their timeout.
+ _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
+ _backend_failed(be);
+ P_DEBUG("%s: backend failed to connect\n", __func__);
+ return;
+ }
+ P_DEBUG("%s: backend connected\n", __func__);
+ be->connecting = false;
+ be->state = mcp_backend_read;
+ be->bad = false;
+ be->failed_count = 0;
+ }
+ int res = _flush_pending_write(be);
+ if (res == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ return;
+ }
+ }
+
+ if (which & EV_READ) {
+ // We do the syscall here before diving into the state machine to allow a
+ // common code path for io_uring/epoll
+ int res = 1;
+ int read = 0;
+ while (res > 0) {
+ char *rbuf = NULL;
+ size_t toread = 0;
+ // need to input how much was read since last call
+ // needs _output_ of the buffer to read into and how much.
+ res = proxy_backend_drive_machine(be, read, &rbuf, &toread);
+ P_DEBUG("%s: res: %d toread: %lu\n", __func__, res, toread);
+
+ if (res > 0) {
+ read = recv(mcmc_fd(be->client), rbuf, toread, 0);
+ P_DEBUG("%s: read: %d\n", __func__, read);
+ if (read == 0) {
+ // not connected or error.
+ _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
+ return;
+ } else if (read == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ break; // sit on epoll again.
+ } else {
+ _reset_bad_backend(be, P_BE_FAIL_READING);
+ return;
+ }
+ }
+ } else if (res == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_PARSING);
+ return;
+ } else {
+ break;
+ }
+ }
+
+#ifdef PROXY_DEBUG
+ if (!STAILQ_EMPTY(&be->io_head)) {
+ P_DEBUG("backend has leftover IOs: %d\n", be->depth);
+ }
+#endif
+ }
+
+ // Still pending requests to read or write.
+ if (!STAILQ_EMPTY(&be->io_head)) {
+ flags |= EV_READ; // FIXME (v2): might not be necessary here, but ensures we get a disconnect event.
+ _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler);
+ }
+}
+
+// TODO (v2): IORING_SETUP_ATTACH_WQ port from bench_event once we have multiple
+// event threads.
+void proxy_init_evthread_events(proxy_event_thread_t *t) {
+#ifdef HAVE_LIBURING
+ bool use_uring = t->ctx->use_uring;
+ struct io_uring_params p = {0};
+ assert(t->event_fd); // uring only exists where eventfd also does.
+
+ // Setup the CQSIZE to be much larger than SQ size, since backpressure
+ // issues can cause us to block on SQ submissions and as a network server,
+ // stuff happens.
+
+ if (use_uring) {
+ p.flags = IORING_SETUP_CQSIZE;
+ p.cq_entries = PRING_QUEUE_CQ_ENTRIES;
+ int ret = io_uring_queue_init_params(PRING_QUEUE_SQ_ENTRIES, &t->ring, &p);
+ if (ret) {
+ perror("io_uring_queue_init_params");
+ exit(1);
+ }
+ if (!(p.features & IORING_FEAT_NODROP)) {
+ fprintf(stderr, "uring: kernel missing IORING_FEAT_NODROP, using libevent\n");
+ use_uring = false;
+ }
+ if (!(p.features & IORING_FEAT_SINGLE_MMAP)) {
+ fprintf(stderr, "uring: kernel missing IORING_FEAT_SINGLE_MMAP, using libevent\n");
+ use_uring = false;
+ }
+ if (!(p.features & IORING_FEAT_FAST_POLL)) {
+ fprintf(stderr, "uring: kernel missing IORING_FEAT_FAST_POLL, using libevent\n");
+ use_uring = false;
+ }
+
+ if (use_uring) {
+ // FIXME (v2): Sigh. we need a blocking event_fd for io_uring but we've a
+ // chicken and egg in here. need a better structure... in meantime
+ // re-create the event_fd.
+ close(t->event_fd);
+ t->event_fd = eventfd(0, 0);
+ // FIXME (v2): hack for event init.
+ t->ur_notify_event.set = false;
+ _proxy_evthr_evset_notifier(t);
+
+ // periodic data updater for event thread
+ t->ur_clock_event.cb = proxy_event_updater_ur;
+ t->ur_clock_event.udata = t;
+ t->ur_clock_event.set = false;
+ _proxy_evthr_evset_clock(t);
+
+ t->use_uring = true;
+ return;
+ } else {
+ // Decided to not use io_uring, so don't waste memory.
+ t->use_uring = false;
+ io_uring_queue_exit(&t->ring);
+ }
+ } else {
+ t->use_uring = false;
+ }
+#endif
+
+ struct event_config *ev_config;
+ ev_config = event_config_new();
+ event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
+ t->base = event_base_new_with_config(ev_config);
+ event_config_free(ev_config);
+ if (! t->base) {
+ fprintf(stderr, "Can't allocate event base\n");
+ exit(1);
+ }
+
+ // listen for notifications.
+ // NULL was thread_libevent_process
+ // FIXME (v2): use modern format? (event_assign)
+#ifdef USE_EVENTFD
+ event_set(&t->notify_event, t->event_fd,
+ EV_READ | EV_PERSIST, proxy_event_handler, t);
+#else
+ event_set(&t->notify_event, t->notify_receive_fd,
+ EV_READ | EV_PERSIST, proxy_event_handler, t);
+#endif
+
+ evtimer_set(&t->clock_event, proxy_event_updater, t);
+ event_base_set(t->base, &t->clock_event);
+ struct timeval rate = {.tv_sec = 3, .tv_usec = 0};
+ evtimer_add(&t->clock_event, &rate);
+
+ event_base_set(t->base, &t->notify_event);
+ if (event_add(&t->notify_event, 0) == -1) {
+ fprintf(stderr, "Can't monitor libevent notify pipe\n");
+ exit(1);
+ }
+
+}
+
+