diff options
Diffstat (limited to 'proxy_network.c')
-rw-r--r-- | proxy_network.c | 301 |
1 files changed, 173 insertions, 128 deletions
diff --git a/proxy_network.c b/proxy_network.c index 8ce4bab..9f2c410 100644 --- a/proxy_network.c +++ b/proxy_network.c @@ -41,6 +41,8 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg); static void proxy_event_handler(evutil_socket_t fd, short which, void *arg); static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg); static void proxy_event_updater(evutil_socket_t fd, short which, void *arg); +static int _prep_pending_write(mcp_backend_t *be); +static bool _post_pending_write(mcp_backend_t *be, ssize_t sent); static int _flush_pending_write(mcp_backend_t *be); static void _cleanup_backend(mcp_backend_t *be); static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err); @@ -163,7 +165,9 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) { } #ifdef HAVE_LIBURING +//static void _proxy_evthr_evset_wnotify(proxy_event_thread_t *t, int notify_fd); static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts); +static void _proxy_evthr_evset_be_writev(mcp_backend_t *be, int iovcnt, struct __kernel_timespec *ts); static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts); static void _proxy_evthr_evset_be_retry(mcp_backend_t *be); static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timespec *ts); @@ -172,8 +176,55 @@ static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t); static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t); static void _proxy_evthr_evset_clock(proxy_event_thread_t *t); static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe); +static void _backend_failed_ur(mcp_backend_t *be); struct __kernel_timespec updater_ts = {.tv_sec = 3, .tv_nsec = 0}; +static void _flush_pending_write_ur(mcp_backend_t *be) { + // Allow us to be called with an empty stack to prevent dev errors. + if (STAILQ_EMPTY(&be->io_head)) { + return; + } + + int iovcnt = _prep_pending_write(be); + + // TODO: write timeout. + _proxy_evthr_evset_be_writev(be, iovcnt, &be->event_thread->tunables.read_ur); +} + +// TODO: we shouldn't handle reads if a write is pending, so postwrite should +// check for pending read data before going into read mode. +// need be->writing flag to toggle? +static void proxy_backend_postwrite_ur(void *udata, struct io_uring_cqe *cqe) { + mcp_backend_t *be = udata; + P_DEBUG("%s: %d\n", __func__, cqe->res); + assert(cqe->res != -EINVAL); + int sent = cqe->res; + if (sent < 0) { + // FIXME: sent == 0 is disconnected? I keep forgetting. + if (sent == -EAGAIN || sent == -EWOULDBLOCK) { + // didn't do any writing, wait for a writeable socket. + _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.read_ur); + } else { + _reset_bad_backend(be, P_BE_FAIL_WRITING); + _backend_failed_ur(be); + } + } + + if (_post_pending_write(be, sent)) { + // commands were flushed, set read handler. + _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur); + } + + if (be->io_next) { + // still have unflushed commands, re-run write command. + // writev can't "block if EAGAIN" in io_uring so far as I can tell, so + // we have to switch to polling mode here. + _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.read_ur); + } + + // TODO: if rbufused != 0, push through drive machine? +} + static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe) { proxy_event_thread_t *t = udata; proxy_ctx_t *ctx = t->ctx; @@ -256,43 +307,16 @@ static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) { static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) { mcp_backend_t *be = udata; - int flags = 0; be->can_write = true; - if (be->connecting) { - int err = 0; - // We were connecting, now ensure we're properly connected. - if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) { - // kick the bad backend, clear the queue, retry later. - // TODO (v2): if a connect fails, anything currently in the queue - // should be safe to hold up until their timeout. - _reset_bad_backend(be, P_BE_FAIL_CONNECTING); - _backend_failed_ur(be); - P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port); - return; - } - P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port); - be->connecting = false; - be->state = mcp_backend_read; - be->bad = false; - be->failed_count = 0; - } - int res = _flush_pending_write(be); - if (res == -1) { - _reset_bad_backend(be, P_BE_FAIL_WRITING); - return; - } - - if (flags & EV_WRITE) { - _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.connect_ur); - } + _flush_pending_write_ur(be); - _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur); + _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur); } // a backend with an outstanding new connection has become writeable. // check validity. -// TODO: this gets an error if cancelled right? check it? +// TODO: this gets an error if cancelled right? static void proxy_backend_beconn_ur(void *udata, struct io_uring_cqe *cqe) { mcp_backend_t *be = udata; int err = 0; @@ -373,16 +397,7 @@ static void proxy_backend_beconn_validate_ur(void *udata, struct io_uring_cqe *c be->rbufused = 0; // Passed validation, don't need to re-read, flush any pending writes. - int res = _flush_pending_write(be); - if (res == -1) { - _reset_bad_backend(be, P_BE_FAIL_WRITING); - _backend_failed_ur(be); - return; - } - - if (!STAILQ_EMPTY(&be->io_head)) { - _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur); - } + _flush_pending_write(be); } // TODO (v3): much code shared with proxy_event_beconn, should be able to @@ -474,27 +489,11 @@ static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) { // move the backend sqe bits into a write complete handler STAILQ_FOREACH(be, &t->be_head, be_next) { be->stacked = false; - int flags = 0; - if (be->connecting) { + if (be->connecting || be->validating) { P_DEBUG("%s: deferring IO pending connecting\n", __func__); - flags |= EV_WRITE; - } else { - flags = _flush_pending_write(be); - } - - if (flags == -1) { - _reset_bad_backend(be, P_BE_FAIL_WRITING); } else { - // FIXME (v2): needs a re-write to handle sqe starvation. - // FIXME (v2): can't actually set the read here? need to confirm _some_ - // write first? - if (flags & EV_WRITE) { - _proxy_evthr_evset_be_wrpoll(be, &t->tunables.connect_ur); - } - if (flags & EV_READ) { - _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &t->tunables.read_ur); - } + _flush_pending_write_ur(be); } } } @@ -557,6 +556,37 @@ static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timesp io_uring_sqe_set_data(sqe, &be->ur_te_ev); } +// reusing the ur_wr_ev. +static void _proxy_evthr_evset_be_writev(mcp_backend_t *be, int iovcnt, struct __kernel_timespec *ts) { + struct io_uring_sqe *sqe; + if (be->ur_wr_ev.set) + return; + + be->ur_wr_ev.cb = proxy_backend_postwrite_ur; + be->ur_wr_ev.udata = be; + + sqe = io_uring_get_sqe(&be->event_thread->ring); + // FIXME (v2): NULL? + + if (iovcnt == 1) { + io_uring_prep_write(sqe, mcmc_fd(be->client), be->write_iovs[0].iov_base, be->write_iovs[0].iov_len, 0); + } else { + io_uring_prep_writev(sqe, mcmc_fd(be->client), be->write_iovs, iovcnt, 0); + } + io_uring_sqe_set_data(sqe, &be->ur_wr_ev); + be->ur_wr_ev.set = true; + + sqe->flags |= IOSQE_IO_LINK; + + // add a timeout. + be->ur_te_ev.cb = proxy_backend_timeout_handler_ur; + be->ur_te_ev.udata = be; + sqe = io_uring_get_sqe(&be->event_thread->ring); + + io_uring_prep_link_timeout(sqe, ts, 0); + io_uring_sqe_set_data(sqe, &be->ur_te_ev); +} + static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts) { struct io_uring_sqe *sqe; if (be->ur_wr_ev.set) @@ -611,8 +641,22 @@ static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, io_uring_prep_link_timeout(sqe, ts, 0); io_uring_sqe_set_data(sqe, &be->ur_te_ev); + } +// FIXME: can this be inside the function? +//static eventfd_t dummy_event = 1; +// TODO: in newer versions of uring we can set ignore success? +/*static void _proxy_evthr_evset_wnotify(proxy_event_thread_t *t, int notify_fd) { + struct io_uring_sqe *sqe; + + sqe = io_uring_get_sqe(&t->ring); + // FIXME (v2) NULL? + + io_uring_prep_write(sqe, notify_fd, &dummy_event, sizeof(dummy_event), 0); + io_uring_sqe_set_data(sqe, NULL); +}*/ + static void _proxy_evthr_evset_clock(proxy_event_thread_t *t) { struct io_uring_sqe *sqe; @@ -654,28 +698,21 @@ static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t) { io_uring_sqe_set_data(sqe, &t->ur_notify_event); } -// TODO (v2): CQE's can generate many SQE's, so we might need to occasionally check -// for space free in the sqe queue and submit in the middle of the cqe -// foreach. -// There might be better places to do this, but I think it's cleaner if -// submission and cqe can stay in this function. -// TODO (v2): The problem is io_submit() can deadlock if too many cqe's are -// waiting. -// Need to understand if this means "CQE's ready to be picked up" or "CQE's in -// flight", because the former is much easier to work around (ie; only run the -// backend handler after dequeuing everything else) // TODO (v2): IOURING_FEAT_NODROP: uring_submit() should return -EBUSY if out of CQ // events slots. Therefore might starve SQE's if we were low beforehand. -// - switch from for_each_cqe to doing one at a time (for now?) -// - track # of sqe's allocated in the cqe loop. -// - stop and submit if we've >= half the queue. -// - ??? when can a CQE generate > 1 SQE? -// - wrhandler_ur can set both wrpoll and read -// - if CQE's can gen > 1 SQE at a time, we'll eventually starve. -// - proper flow: CQE's can enqueue backends to be processed. -// - after CQE's are processed, backends are processed (ouch?) -// - if SQE's starve here, bail but keep the BE queue. -// - then submit SQE's +// - when uring events are armed, they should link into an STAILQ +// - after all cqe's are processed from the loop, walk the queued events +// - generate SQE's as necessary, bailing if we run out before running out of +// events. +// - submit the SQE's +// - if it bails on -EBUSY due to too many CQE's, run the CQE loop again +// - submit if there were pending SQE's before resuming walking the event +// chain. +// +// Think this is the best compromise; doesn't use temporary memory for +// processing CQE's, and we already have dedicated memory for the SQE side of +// things so adding a little more for an STAILQ is fine. +// Until then this code will deadlock and die if -EBUSY happens. void *proxy_event_thread_ur(void *arg) { proxy_event_thread_t *t = arg; struct io_uring_cqe *cqe; @@ -695,8 +732,10 @@ void *proxy_event_thread_ur(void *arg) { P_DEBUG("%s: got a CQE [count:%d]\n", __func__, count); proxy_event_t *pe = io_uring_cqe_get_data(cqe); - pe->set = false; - pe->cb(pe->udata, cqe); + if (pe != NULL) { + pe->set = false; + pe->cb(pe->udata, cqe); + } count++; } @@ -1252,7 +1291,7 @@ static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) { return 0; } -static int _prep_pending_write(mcp_backend_t *be, unsigned int *tosend) { +static int _prep_pending_write(mcp_backend_t *be) { struct iovec *iovs = be->write_iovs; io_pending_proxy_t *io = NULL; int iovused = 0; @@ -1268,74 +1307,80 @@ static int _prep_pending_write(mcp_backend_t *be, unsigned int *tosend) { continue; if (io->iovcnt + iovused > BE_IOV_MAX) { - // Signal to caller that we need to keep writing, if writeable. - // FIXME (v2): can certainly refactor this to loop instead of waiting - // for a writeable event. - *tosend += 1; + // We will need to keep writing later. break; } memcpy(&iovs[iovused], io->iov, sizeof(struct iovec)*io->iovcnt); iovused += io->iovcnt; - *tosend += io->iovbytes; } return iovused; } +// returns true if any pending writes were fully flushed. +static bool _post_pending_write(mcp_backend_t *be, ssize_t sent) { + io_pending_proxy_t *io = be->io_next; + assert(io != NULL); + + bool did_flush = false; + for (; io; io = STAILQ_NEXT(io, io_next)) { + bool flushed = true; + if (io->flushed) + continue; + + if (sent >= io->iovbytes) { + // short circuit for common case. + sent -= io->iovbytes; + } else { + io->iovbytes -= sent; + for (int x = 0; x < io->iovcnt; x++) { + struct iovec *iov = &io->iov[x]; + if (sent >= iov->iov_len) { + sent -= iov->iov_len; + iov->iov_len = 0; + } else { + iov->iov_len -= sent; + iov->iov_base = (char *)iov->iov_base + sent; + sent = 0; + flushed = false; + break; + } + } + } + io->flushed = flushed; + + if (flushed) { + did_flush = flushed; + be->io_next = STAILQ_NEXT(io, io_next); + } + if (sent <= 0) { + // really shouldn't be negative, though. + assert(sent >= 0); + break; + } + } // for + + return did_flush; +} + static int _flush_pending_write(mcp_backend_t *be) { int flags = 0; - unsigned int tosend = 0; // Allow us to be called with an empty stack to prevent dev errors. if (STAILQ_EMPTY(&be->io_head)) { return 0; } - int iovcnt = _prep_pending_write(be, &tosend); + int iovcnt = _prep_pending_write(be); ssize_t sent = writev(mcmc_fd(be->client), be->write_iovs, iovcnt); if (sent > 0) { - io_pending_proxy_t *io = be->io_next; - assert(io != NULL); - if (sent < tosend) { + if (_post_pending_write(be, sent)) { + flags |= EV_READ; + } + // still have unflushed pending IO's, check for write and re-loop. + if (be->io_next) { flags |= EV_WRITE; } - - for (; io; io = STAILQ_NEXT(io, io_next)) { - bool flushed = true; - if (io->flushed) - continue; - - if (sent >= io->iovbytes) { - // short circuit for common case. - sent -= io->iovbytes; - } else { - io->iovbytes -= sent; - for (int x = 0; x < io->iovcnt; x++) { - struct iovec *iov = &io->iov[x]; - if (sent >= iov->iov_len) { - sent -= iov->iov_len; - iov->iov_len = 0; - } else { - iov->iov_len -= sent; - iov->iov_base = (char *)iov->iov_base + sent; - sent = 0; - flushed = false; - break; - } - } - } - io->flushed = flushed; - - if (flushed) { - flags |= EV_READ; - be->io_next = STAILQ_NEXT(io, io_next); - } - if (sent <= 0) { - // really shouldn't be negative, though. - assert(sent >= 0); - break; - } - } // for } else if (sent == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { be->can_write = false; |