summaryrefslogtreecommitdiff
path: root/proxy_network.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-12-14 16:48:19 -0800
committerdormando <dormando@rydia.net>2023-01-02 15:14:29 -0800
commitd15bbb16d66263c571b3743084c8ea5249bf0718 (patch)
tree776bb70f7116a440f612c205453ba36fa1b1592f /proxy_network.c
parent8b98647d1ad80dee3ddb49dfc51cd7bd07fe9297 (diff)
downloadmemcached-d15bbb16d66263c571b3743084c8ea5249bf0718.tar.gz
proxy: io_uring work and simplify write flush
expands on the io_uring port for the IO thread, moving write calls behind the ring. unfortunately this does not result in any performance uplift. I also tried moving the eventfd related writes (which was backed out later). The code still needs to be updated to generate SQE's outside of the CQE loop (see comments above the uring event loop). It is still generally buggy and not to be used, but keeping it up to date every once in a while will let us use it if it ever does become performant enough... This also makes use of a prior optimization to simplify the write flush code a little. It no longer needs to track how many bytes were intended to write to know if it should continue flushing later.
Diffstat (limited to 'proxy_network.c')
-rw-r--r--proxy_network.c301
1 files changed, 173 insertions, 128 deletions
diff --git a/proxy_network.c b/proxy_network.c
index 8ce4bab..9f2c410 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -41,6 +41,8 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg);
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg);
static void proxy_event_updater(evutil_socket_t fd, short which, void *arg);
+static int _prep_pending_write(mcp_backend_t *be);
+static bool _post_pending_write(mcp_backend_t *be, ssize_t sent);
static int _flush_pending_write(mcp_backend_t *be);
static void _cleanup_backend(mcp_backend_t *be);
static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err);
@@ -163,7 +165,9 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
}
#ifdef HAVE_LIBURING
+//static void _proxy_evthr_evset_wnotify(proxy_event_thread_t *t, int notify_fd);
static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts);
+static void _proxy_evthr_evset_be_writev(mcp_backend_t *be, int iovcnt, struct __kernel_timespec *ts);
static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts);
static void _proxy_evthr_evset_be_retry(mcp_backend_t *be);
static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timespec *ts);
@@ -172,8 +176,55 @@ static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t);
static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t);
static void _proxy_evthr_evset_clock(proxy_event_thread_t *t);
static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe);
+static void _backend_failed_ur(mcp_backend_t *be);
struct __kernel_timespec updater_ts = {.tv_sec = 3, .tv_nsec = 0};
+static void _flush_pending_write_ur(mcp_backend_t *be) {
+ // Allow us to be called with an empty stack to prevent dev errors.
+ if (STAILQ_EMPTY(&be->io_head)) {
+ return;
+ }
+
+ int iovcnt = _prep_pending_write(be);
+
+ // TODO: write timeout.
+ _proxy_evthr_evset_be_writev(be, iovcnt, &be->event_thread->tunables.read_ur);
+}
+
+// TODO: we shouldn't handle reads if a write is pending, so postwrite should
+// check for pending read data before going into read mode.
+// need be->writing flag to toggle?
+static void proxy_backend_postwrite_ur(void *udata, struct io_uring_cqe *cqe) {
+ mcp_backend_t *be = udata;
+ P_DEBUG("%s: %d\n", __func__, cqe->res);
+ assert(cqe->res != -EINVAL);
+ int sent = cqe->res;
+ if (sent < 0) {
+ // FIXME: sent == 0 is disconnected? I keep forgetting.
+ if (sent == -EAGAIN || sent == -EWOULDBLOCK) {
+ // didn't do any writing, wait for a writeable socket.
+ _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.read_ur);
+ } else {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed_ur(be);
+ }
+ }
+
+ if (_post_pending_write(be, sent)) {
+ // commands were flushed, set read handler.
+ _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur);
+ }
+
+ if (be->io_next) {
+ // still have unflushed commands, re-run write command.
+ // writev can't "block if EAGAIN" in io_uring so far as I can tell, so
+ // we have to switch to polling mode here.
+ _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.read_ur);
+ }
+
+ // TODO: if rbufused != 0, push through drive machine?
+}
+
static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe) {
proxy_event_thread_t *t = udata;
proxy_ctx_t *ctx = t->ctx;
@@ -256,43 +307,16 @@ static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) {
static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
mcp_backend_t *be = udata;
- int flags = 0;
be->can_write = true;
- if (be->connecting) {
- int err = 0;
- // We were connecting, now ensure we're properly connected.
- if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) {
- // kick the bad backend, clear the queue, retry later.
- // TODO (v2): if a connect fails, anything currently in the queue
- // should be safe to hold up until their timeout.
- _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
- _backend_failed_ur(be);
- P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port);
- return;
- }
- P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port);
- be->connecting = false;
- be->state = mcp_backend_read;
- be->bad = false;
- be->failed_count = 0;
- }
- int res = _flush_pending_write(be);
- if (res == -1) {
- _reset_bad_backend(be, P_BE_FAIL_WRITING);
- return;
- }
-
- if (flags & EV_WRITE) {
- _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.connect_ur);
- }
+ _flush_pending_write_ur(be);
- _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur);
+ _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur);
}
// a backend with an outstanding new connection has become writeable.
// check validity.
-// TODO: this gets an error if cancelled right? check it?
+// TODO: this gets an error if cancelled right?
static void proxy_backend_beconn_ur(void *udata, struct io_uring_cqe *cqe) {
mcp_backend_t *be = udata;
int err = 0;
@@ -373,16 +397,7 @@ static void proxy_backend_beconn_validate_ur(void *udata, struct io_uring_cqe *c
be->rbufused = 0;
// Passed validation, don't need to re-read, flush any pending writes.
- int res = _flush_pending_write(be);
- if (res == -1) {
- _reset_bad_backend(be, P_BE_FAIL_WRITING);
- _backend_failed_ur(be);
- return;
- }
-
- if (!STAILQ_EMPTY(&be->io_head)) {
- _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur);
- }
+ _flush_pending_write(be);
}
// TODO (v3): much code shared with proxy_event_beconn, should be able to
@@ -474,27 +489,11 @@ static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) {
// move the backend sqe bits into a write complete handler
STAILQ_FOREACH(be, &t->be_head, be_next) {
be->stacked = false;
- int flags = 0;
- if (be->connecting) {
+ if (be->connecting || be->validating) {
P_DEBUG("%s: deferring IO pending connecting\n", __func__);
- flags |= EV_WRITE;
- } else {
- flags = _flush_pending_write(be);
- }
-
- if (flags == -1) {
- _reset_bad_backend(be, P_BE_FAIL_WRITING);
} else {
- // FIXME (v2): needs a re-write to handle sqe starvation.
- // FIXME (v2): can't actually set the read here? need to confirm _some_
- // write first?
- if (flags & EV_WRITE) {
- _proxy_evthr_evset_be_wrpoll(be, &t->tunables.connect_ur);
- }
- if (flags & EV_READ) {
- _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &t->tunables.read_ur);
- }
+ _flush_pending_write_ur(be);
}
}
}
@@ -557,6 +556,37 @@ static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timesp
io_uring_sqe_set_data(sqe, &be->ur_te_ev);
}
+// reusing the ur_wr_ev.
+static void _proxy_evthr_evset_be_writev(mcp_backend_t *be, int iovcnt, struct __kernel_timespec *ts) {
+ struct io_uring_sqe *sqe;
+ if (be->ur_wr_ev.set)
+ return;
+
+ be->ur_wr_ev.cb = proxy_backend_postwrite_ur;
+ be->ur_wr_ev.udata = be;
+
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+ // FIXME (v2): NULL?
+
+ if (iovcnt == 1) {
+ io_uring_prep_write(sqe, mcmc_fd(be->client), be->write_iovs[0].iov_base, be->write_iovs[0].iov_len, 0);
+ } else {
+ io_uring_prep_writev(sqe, mcmc_fd(be->client), be->write_iovs, iovcnt, 0);
+ }
+ io_uring_sqe_set_data(sqe, &be->ur_wr_ev);
+ be->ur_wr_ev.set = true;
+
+ sqe->flags |= IOSQE_IO_LINK;
+
+ // add a timeout.
+ be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
+ be->ur_te_ev.udata = be;
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+
+ io_uring_prep_link_timeout(sqe, ts, 0);
+ io_uring_sqe_set_data(sqe, &be->ur_te_ev);
+}
+
static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts) {
struct io_uring_sqe *sqe;
if (be->ur_wr_ev.set)
@@ -611,8 +641,22 @@ static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len,
io_uring_prep_link_timeout(sqe, ts, 0);
io_uring_sqe_set_data(sqe, &be->ur_te_ev);
+
}
+// FIXME: can this be inside the function?
+//static eventfd_t dummy_event = 1;
+// TODO: in newer versions of uring we can set ignore success?
+/*static void _proxy_evthr_evset_wnotify(proxy_event_thread_t *t, int notify_fd) {
+ struct io_uring_sqe *sqe;
+
+ sqe = io_uring_get_sqe(&t->ring);
+ // FIXME (v2) NULL?
+
+ io_uring_prep_write(sqe, notify_fd, &dummy_event, sizeof(dummy_event), 0);
+ io_uring_sqe_set_data(sqe, NULL);
+}*/
+
static void _proxy_evthr_evset_clock(proxy_event_thread_t *t) {
struct io_uring_sqe *sqe;
@@ -654,28 +698,21 @@ static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t) {
io_uring_sqe_set_data(sqe, &t->ur_notify_event);
}
-// TODO (v2): CQE's can generate many SQE's, so we might need to occasionally check
-// for space free in the sqe queue and submit in the middle of the cqe
-// foreach.
-// There might be better places to do this, but I think it's cleaner if
-// submission and cqe can stay in this function.
-// TODO (v2): The problem is io_submit() can deadlock if too many cqe's are
-// waiting.
-// Need to understand if this means "CQE's ready to be picked up" or "CQE's in
-// flight", because the former is much easier to work around (ie; only run the
-// backend handler after dequeuing everything else)
// TODO (v2): IOURING_FEAT_NODROP: uring_submit() should return -EBUSY if out of CQ
// events slots. Therefore might starve SQE's if we were low beforehand.
-// - switch from for_each_cqe to doing one at a time (for now?)
-// - track # of sqe's allocated in the cqe loop.
-// - stop and submit if we've >= half the queue.
-// - ??? when can a CQE generate > 1 SQE?
-// - wrhandler_ur can set both wrpoll and read
-// - if CQE's can gen > 1 SQE at a time, we'll eventually starve.
-// - proper flow: CQE's can enqueue backends to be processed.
-// - after CQE's are processed, backends are processed (ouch?)
-// - if SQE's starve here, bail but keep the BE queue.
-// - then submit SQE's
+// - when uring events are armed, they should link into an STAILQ
+// - after all cqe's are processed from the loop, walk the queued events
+// - generate SQE's as necessary, bailing if we run out before running out of
+// events.
+// - submit the SQE's
+// - if it bails on -EBUSY due to too many CQE's, run the CQE loop again
+// - submit if there were pending SQE's before resuming walking the event
+// chain.
+//
+// Think this is the best compromise; doesn't use temporary memory for
+// processing CQE's, and we already have dedicated memory for the SQE side of
+// things so adding a little more for an STAILQ is fine.
+// Until then this code will deadlock and die if -EBUSY happens.
void *proxy_event_thread_ur(void *arg) {
proxy_event_thread_t *t = arg;
struct io_uring_cqe *cqe;
@@ -695,8 +732,10 @@ void *proxy_event_thread_ur(void *arg) {
P_DEBUG("%s: got a CQE [count:%d]\n", __func__, count);
proxy_event_t *pe = io_uring_cqe_get_data(cqe);
- pe->set = false;
- pe->cb(pe->udata, cqe);
+ if (pe != NULL) {
+ pe->set = false;
+ pe->cb(pe->udata, cqe);
+ }
count++;
}
@@ -1252,7 +1291,7 @@ static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) {
return 0;
}
-static int _prep_pending_write(mcp_backend_t *be, unsigned int *tosend) {
+static int _prep_pending_write(mcp_backend_t *be) {
struct iovec *iovs = be->write_iovs;
io_pending_proxy_t *io = NULL;
int iovused = 0;
@@ -1268,74 +1307,80 @@ static int _prep_pending_write(mcp_backend_t *be, unsigned int *tosend) {
continue;
if (io->iovcnt + iovused > BE_IOV_MAX) {
- // Signal to caller that we need to keep writing, if writeable.
- // FIXME (v2): can certainly refactor this to loop instead of waiting
- // for a writeable event.
- *tosend += 1;
+ // We will need to keep writing later.
break;
}
memcpy(&iovs[iovused], io->iov, sizeof(struct iovec)*io->iovcnt);
iovused += io->iovcnt;
- *tosend += io->iovbytes;
}
return iovused;
}
+// returns true if any pending writes were fully flushed.
+static bool _post_pending_write(mcp_backend_t *be, ssize_t sent) {
+ io_pending_proxy_t *io = be->io_next;
+ assert(io != NULL);
+
+ bool did_flush = false;
+ for (; io; io = STAILQ_NEXT(io, io_next)) {
+ bool flushed = true;
+ if (io->flushed)
+ continue;
+
+ if (sent >= io->iovbytes) {
+ // short circuit for common case.
+ sent -= io->iovbytes;
+ } else {
+ io->iovbytes -= sent;
+ for (int x = 0; x < io->iovcnt; x++) {
+ struct iovec *iov = &io->iov[x];
+ if (sent >= iov->iov_len) {
+ sent -= iov->iov_len;
+ iov->iov_len = 0;
+ } else {
+ iov->iov_len -= sent;
+ iov->iov_base = (char *)iov->iov_base + sent;
+ sent = 0;
+ flushed = false;
+ break;
+ }
+ }
+ }
+ io->flushed = flushed;
+
+ if (flushed) {
+ did_flush = flushed;
+ be->io_next = STAILQ_NEXT(io, io_next);
+ }
+ if (sent <= 0) {
+ // really shouldn't be negative, though.
+ assert(sent >= 0);
+ break;
+ }
+ } // for
+
+ return did_flush;
+}
+
static int _flush_pending_write(mcp_backend_t *be) {
int flags = 0;
- unsigned int tosend = 0;
// Allow us to be called with an empty stack to prevent dev errors.
if (STAILQ_EMPTY(&be->io_head)) {
return 0;
}
- int iovcnt = _prep_pending_write(be, &tosend);
+ int iovcnt = _prep_pending_write(be);
ssize_t sent = writev(mcmc_fd(be->client), be->write_iovs, iovcnt);
if (sent > 0) {
- io_pending_proxy_t *io = be->io_next;
- assert(io != NULL);
- if (sent < tosend) {
+ if (_post_pending_write(be, sent)) {
+ flags |= EV_READ;
+ }
+ // still have unflushed pending IO's, check for write and re-loop.
+ if (be->io_next) {
flags |= EV_WRITE;
}
-
- for (; io; io = STAILQ_NEXT(io, io_next)) {
- bool flushed = true;
- if (io->flushed)
- continue;
-
- if (sent >= io->iovbytes) {
- // short circuit for common case.
- sent -= io->iovbytes;
- } else {
- io->iovbytes -= sent;
- for (int x = 0; x < io->iovcnt; x++) {
- struct iovec *iov = &io->iov[x];
- if (sent >= iov->iov_len) {
- sent -= iov->iov_len;
- iov->iov_len = 0;
- } else {
- iov->iov_len -= sent;
- iov->iov_base = (char *)iov->iov_base + sent;
- sent = 0;
- flushed = false;
- break;
- }
- }
- }
- io->flushed = flushed;
-
- if (flushed) {
- flags |= EV_READ;
- be->io_next = STAILQ_NEXT(io, io_next);
- }
- if (sent <= 0) {
- // really shouldn't be negative, though.
- assert(sent >= 0);
- break;
- }
- } // for
} else if (sent == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
be->can_write = false;