summaryrefslogtreecommitdiff
path: root/proxy_network.c
diff options
context:
space:
mode:
Diffstat (limited to 'proxy_network.c')
-rw-r--r--proxy_network.c301
1 files changed, 173 insertions, 128 deletions
diff --git a/proxy_network.c b/proxy_network.c
index 8ce4bab..9f2c410 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -41,6 +41,8 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg);
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg);
static void proxy_event_updater(evutil_socket_t fd, short which, void *arg);
+static int _prep_pending_write(mcp_backend_t *be);
+static bool _post_pending_write(mcp_backend_t *be, ssize_t sent);
static int _flush_pending_write(mcp_backend_t *be);
static void _cleanup_backend(mcp_backend_t *be);
static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err);
@@ -163,7 +165,9 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
}
#ifdef HAVE_LIBURING
+//static void _proxy_evthr_evset_wnotify(proxy_event_thread_t *t, int notify_fd);
static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts);
+static void _proxy_evthr_evset_be_writev(mcp_backend_t *be, int iovcnt, struct __kernel_timespec *ts);
static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts);
static void _proxy_evthr_evset_be_retry(mcp_backend_t *be);
static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timespec *ts);
@@ -172,8 +176,55 @@ static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t);
static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t);
static void _proxy_evthr_evset_clock(proxy_event_thread_t *t);
static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe);
+static void _backend_failed_ur(mcp_backend_t *be);
struct __kernel_timespec updater_ts = {.tv_sec = 3, .tv_nsec = 0};
+static void _flush_pending_write_ur(mcp_backend_t *be) {
+ // Allow us to be called with an empty stack to prevent dev errors.
+ if (STAILQ_EMPTY(&be->io_head)) {
+ return;
+ }
+
+ int iovcnt = _prep_pending_write(be);
+
+ // TODO: write timeout.
+ _proxy_evthr_evset_be_writev(be, iovcnt, &be->event_thread->tunables.read_ur);
+}
+
+// TODO: we shouldn't handle reads if a write is pending, so postwrite should
+// check for pending read data before going into read mode.
+// need be->writing flag to toggle?
+static void proxy_backend_postwrite_ur(void *udata, struct io_uring_cqe *cqe) {
+ mcp_backend_t *be = udata;
+ P_DEBUG("%s: %d\n", __func__, cqe->res);
+ assert(cqe->res != -EINVAL);
+ int sent = cqe->res;
+ if (sent < 0) {
+ // FIXME: sent == 0 is disconnected? I keep forgetting.
+ if (sent == -EAGAIN || sent == -EWOULDBLOCK) {
+ // didn't do any writing, wait for a writeable socket.
+ _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.read_ur);
+ } else {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed_ur(be);
+ }
+ }
+
+ if (_post_pending_write(be, sent)) {
+ // commands were flushed, set read handler.
+ _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur);
+ }
+
+ if (be->io_next) {
+ // still have unflushed commands, re-run write command.
+ // writev can't "block if EAGAIN" in io_uring so far as I can tell, so
+ // we have to switch to polling mode here.
+ _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.read_ur);
+ }
+
+ // TODO: if rbufused != 0, push through drive machine?
+}
+
static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe) {
proxy_event_thread_t *t = udata;
proxy_ctx_t *ctx = t->ctx;
@@ -256,43 +307,16 @@ static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) {
static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
mcp_backend_t *be = udata;
- int flags = 0;
be->can_write = true;
- if (be->connecting) {
- int err = 0;
- // We were connecting, now ensure we're properly connected.
- if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) {
- // kick the bad backend, clear the queue, retry later.
- // TODO (v2): if a connect fails, anything currently in the queue
- // should be safe to hold up until their timeout.
- _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
- _backend_failed_ur(be);
- P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port);
- return;
- }
- P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port);
- be->connecting = false;
- be->state = mcp_backend_read;
- be->bad = false;
- be->failed_count = 0;
- }
- int res = _flush_pending_write(be);
- if (res == -1) {
- _reset_bad_backend(be, P_BE_FAIL_WRITING);
- return;
- }
-
- if (flags & EV_WRITE) {
- _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.connect_ur);
- }
+ _flush_pending_write_ur(be);
- _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur);
+ _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur);
}
// a backend with an outstanding new connection has become writeable.
// check validity.
-// TODO: this gets an error if cancelled right? check it?
+// TODO: this gets an error if cancelled right?
static void proxy_backend_beconn_ur(void *udata, struct io_uring_cqe *cqe) {
mcp_backend_t *be = udata;
int err = 0;
@@ -373,16 +397,7 @@ static void proxy_backend_beconn_validate_ur(void *udata, struct io_uring_cqe *c
be->rbufused = 0;
// Passed validation, don't need to re-read, flush any pending writes.
- int res = _flush_pending_write(be);
- if (res == -1) {
- _reset_bad_backend(be, P_BE_FAIL_WRITING);
- _backend_failed_ur(be);
- return;
- }
-
- if (!STAILQ_EMPTY(&be->io_head)) {
- _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur);
- }
+ _flush_pending_write(be);
}
// TODO (v3): much code shared with proxy_event_beconn, should be able to
@@ -474,27 +489,11 @@ static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) {
// move the backend sqe bits into a write complete handler
STAILQ_FOREACH(be, &t->be_head, be_next) {
be->stacked = false;
- int flags = 0;
- if (be->connecting) {
+ if (be->connecting || be->validating) {
P_DEBUG("%s: deferring IO pending connecting\n", __func__);
- flags |= EV_WRITE;
- } else {
- flags = _flush_pending_write(be);
- }
-
- if (flags == -1) {
- _reset_bad_backend(be, P_BE_FAIL_WRITING);
} else {
- // FIXME (v2): needs a re-write to handle sqe starvation.
- // FIXME (v2): can't actually set the read here? need to confirm _some_
- // write first?
- if (flags & EV_WRITE) {
- _proxy_evthr_evset_be_wrpoll(be, &t->tunables.connect_ur);
- }
- if (flags & EV_READ) {
- _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &t->tunables.read_ur);
- }
+ _flush_pending_write_ur(be);
}
}
}
@@ -557,6 +556,37 @@ static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timesp
io_uring_sqe_set_data(sqe, &be->ur_te_ev);
}
+// reusing the ur_wr_ev.
+static void _proxy_evthr_evset_be_writev(mcp_backend_t *be, int iovcnt, struct __kernel_timespec *ts) {
+ struct io_uring_sqe *sqe;
+ if (be->ur_wr_ev.set)
+ return;
+
+ be->ur_wr_ev.cb = proxy_backend_postwrite_ur;
+ be->ur_wr_ev.udata = be;
+
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+ // FIXME (v2): NULL?
+
+ if (iovcnt == 1) {
+ io_uring_prep_write(sqe, mcmc_fd(be->client), be->write_iovs[0].iov_base, be->write_iovs[0].iov_len, 0);
+ } else {
+ io_uring_prep_writev(sqe, mcmc_fd(be->client), be->write_iovs, iovcnt, 0);
+ }
+ io_uring_sqe_set_data(sqe, &be->ur_wr_ev);
+ be->ur_wr_ev.set = true;
+
+ sqe->flags |= IOSQE_IO_LINK;
+
+ // add a timeout.
+ be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
+ be->ur_te_ev.udata = be;
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+
+ io_uring_prep_link_timeout(sqe, ts, 0);
+ io_uring_sqe_set_data(sqe, &be->ur_te_ev);
+}
+
static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts) {
struct io_uring_sqe *sqe;
if (be->ur_wr_ev.set)
@@ -611,8 +641,22 @@ static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len,
io_uring_prep_link_timeout(sqe, ts, 0);
io_uring_sqe_set_data(sqe, &be->ur_te_ev);
+
}
+// FIXME: can this be inside the function?
+//static eventfd_t dummy_event = 1;
+// TODO: in newer versions of uring we can set ignore success?
+/*static void _proxy_evthr_evset_wnotify(proxy_event_thread_t *t, int notify_fd) {
+ struct io_uring_sqe *sqe;
+
+ sqe = io_uring_get_sqe(&t->ring);
+ // FIXME (v2) NULL?
+
+ io_uring_prep_write(sqe, notify_fd, &dummy_event, sizeof(dummy_event), 0);
+ io_uring_sqe_set_data(sqe, NULL);
+}*/
+
static void _proxy_evthr_evset_clock(proxy_event_thread_t *t) {
struct io_uring_sqe *sqe;
@@ -654,28 +698,21 @@ static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t) {
io_uring_sqe_set_data(sqe, &t->ur_notify_event);
}
-// TODO (v2): CQE's can generate many SQE's, so we might need to occasionally check
-// for space free in the sqe queue and submit in the middle of the cqe
-// foreach.
-// There might be better places to do this, but I think it's cleaner if
-// submission and cqe can stay in this function.
-// TODO (v2): The problem is io_submit() can deadlock if too many cqe's are
-// waiting.
-// Need to understand if this means "CQE's ready to be picked up" or "CQE's in
-// flight", because the former is much easier to work around (ie; only run the
-// backend handler after dequeuing everything else)
// TODO (v2): IOURING_FEAT_NODROP: uring_submit() should return -EBUSY if out of CQ
// events slots. Therefore might starve SQE's if we were low beforehand.
-// - switch from for_each_cqe to doing one at a time (for now?)
-// - track # of sqe's allocated in the cqe loop.
-// - stop and submit if we've >= half the queue.
-// - ??? when can a CQE generate > 1 SQE?
-// - wrhandler_ur can set both wrpoll and read
-// - if CQE's can gen > 1 SQE at a time, we'll eventually starve.
-// - proper flow: CQE's can enqueue backends to be processed.
-// - after CQE's are processed, backends are processed (ouch?)
-// - if SQE's starve here, bail but keep the BE queue.
-// - then submit SQE's
+// - when uring events are armed, they should link into an STAILQ
+// - after all cqe's are processed from the loop, walk the queued events
+// - generate SQE's as necessary, bailing if we run out before running out of
+// events.
+// - submit the SQE's
+// - if it bails on -EBUSY due to too many CQE's, run the CQE loop again
+// - submit if there were pending SQE's before resuming walking the event
+// chain.
+//
+// Think this is the best compromise; doesn't use temporary memory for
+// processing CQE's, and we already have dedicated memory for the SQE side of
+// things so adding a little more for an STAILQ is fine.
+// Until then this code will deadlock and die if -EBUSY happens.
void *proxy_event_thread_ur(void *arg) {
proxy_event_thread_t *t = arg;
struct io_uring_cqe *cqe;
@@ -695,8 +732,10 @@ void *proxy_event_thread_ur(void *arg) {
P_DEBUG("%s: got a CQE [count:%d]\n", __func__, count);
proxy_event_t *pe = io_uring_cqe_get_data(cqe);
- pe->set = false;
- pe->cb(pe->udata, cqe);
+ if (pe != NULL) {
+ pe->set = false;
+ pe->cb(pe->udata, cqe);
+ }
count++;
}
@@ -1252,7 +1291,7 @@ static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) {
return 0;
}
-static int _prep_pending_write(mcp_backend_t *be, unsigned int *tosend) {
+static int _prep_pending_write(mcp_backend_t *be) {
struct iovec *iovs = be->write_iovs;
io_pending_proxy_t *io = NULL;
int iovused = 0;
@@ -1268,74 +1307,80 @@ static int _prep_pending_write(mcp_backend_t *be, unsigned int *tosend) {
continue;
if (io->iovcnt + iovused > BE_IOV_MAX) {
- // Signal to caller that we need to keep writing, if writeable.
- // FIXME (v2): can certainly refactor this to loop instead of waiting
- // for a writeable event.
- *tosend += 1;
+ // We will need to keep writing later.
break;
}
memcpy(&iovs[iovused], io->iov, sizeof(struct iovec)*io->iovcnt);
iovused += io->iovcnt;
- *tosend += io->iovbytes;
}
return iovused;
}
+// returns true if any pending writes were fully flushed.
+static bool _post_pending_write(mcp_backend_t *be, ssize_t sent) {
+ io_pending_proxy_t *io = be->io_next;
+ assert(io != NULL);
+
+ bool did_flush = false;
+ for (; io; io = STAILQ_NEXT(io, io_next)) {
+ bool flushed = true;
+ if (io->flushed)
+ continue;
+
+ if (sent >= io->iovbytes) {
+ // short circuit for common case.
+ sent -= io->iovbytes;
+ } else {
+ io->iovbytes -= sent;
+ for (int x = 0; x < io->iovcnt; x++) {
+ struct iovec *iov = &io->iov[x];
+ if (sent >= iov->iov_len) {
+ sent -= iov->iov_len;
+ iov->iov_len = 0;
+ } else {
+ iov->iov_len -= sent;
+ iov->iov_base = (char *)iov->iov_base + sent;
+ sent = 0;
+ flushed = false;
+ break;
+ }
+ }
+ }
+ io->flushed = flushed;
+
+ if (flushed) {
+ did_flush = flushed;
+ be->io_next = STAILQ_NEXT(io, io_next);
+ }
+ if (sent <= 0) {
+ // really shouldn't be negative, though.
+ assert(sent >= 0);
+ break;
+ }
+ } // for
+
+ return did_flush;
+}
+
static int _flush_pending_write(mcp_backend_t *be) {
int flags = 0;
- unsigned int tosend = 0;
// Allow us to be called with an empty stack to prevent dev errors.
if (STAILQ_EMPTY(&be->io_head)) {
return 0;
}
- int iovcnt = _prep_pending_write(be, &tosend);
+ int iovcnt = _prep_pending_write(be);
ssize_t sent = writev(mcmc_fd(be->client), be->write_iovs, iovcnt);
if (sent > 0) {
- io_pending_proxy_t *io = be->io_next;
- assert(io != NULL);
- if (sent < tosend) {
+ if (_post_pending_write(be, sent)) {
+ flags |= EV_READ;
+ }
+ // still have unflushed pending IO's, check for write and re-loop.
+ if (be->io_next) {
flags |= EV_WRITE;
}
-
- for (; io; io = STAILQ_NEXT(io, io_next)) {
- bool flushed = true;
- if (io->flushed)
- continue;
-
- if (sent >= io->iovbytes) {
- // short circuit for common case.
- sent -= io->iovbytes;
- } else {
- io->iovbytes -= sent;
- for (int x = 0; x < io->iovcnt; x++) {
- struct iovec *iov = &io->iov[x];
- if (sent >= iov->iov_len) {
- sent -= iov->iov_len;
- iov->iov_len = 0;
- } else {
- iov->iov_len -= sent;
- iov->iov_base = (char *)iov->iov_base + sent;
- sent = 0;
- flushed = false;
- break;
- }
- }
- }
- io->flushed = flushed;
-
- if (flushed) {
- flags |= EV_READ;
- be->io_next = STAILQ_NEXT(io, io_next);
- }
- if (sent <= 0) {
- // really shouldn't be negative, though.
- assert(sent >= 0);
- break;
- }
- } // for
} else if (sent == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
be->can_write = false;