summaryrefslogtreecommitdiff
path: root/proxy_network.c
diff options
context:
space:
mode:
Diffstat (limited to 'proxy_network.c')
-rw-r--r--proxy_network.c626
1 files changed, 3 insertions, 623 deletions
diff --git a/proxy_network.c b/proxy_network.c
index 1d9db29..ca56baa 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -160,569 +160,7 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
return io_count;
}
-#ifdef HAVE_LIBURING
-//static void _proxy_evthr_evset_wnotify(proxy_event_thread_t *t, int notify_fd);
-static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts);
-static void _proxy_evthr_evset_be_writev(mcp_backend_t *be, int iovcnt, struct __kernel_timespec *ts);
-static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts);
-static void _proxy_evthr_evset_be_retry(mcp_backend_t *be);
-static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timespec *ts);
-static void _proxy_evthr_evset_be_readvalidate(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts);
-static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t);
-static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t);
-static void _backend_failed_ur(mcp_backend_t *be);
-
-static void _flush_pending_write_ur(mcp_backend_t *be) {
- // Allow us to be called with an empty stack to prevent dev errors.
- if (STAILQ_EMPTY(&be->io_head)) {
- return;
- }
-
- int iovcnt = _prep_pending_write(be);
-
- // TODO: write timeout.
- _proxy_evthr_evset_be_writev(be, iovcnt, &be->tunables.read_ur);
-}
-
-// TODO: we shouldn't handle reads if a write is pending, so postwrite should
-// check for pending read data before going into read mode.
-// need be->writing flag to toggle?
-static void proxy_backend_postwrite_ur(void *udata, struct io_uring_cqe *cqe) {
- mcp_backend_t *be = udata;
- P_DEBUG("%s: %d\n", __func__, cqe->res);
- assert(cqe->res != -EINVAL);
- int sent = cqe->res;
- if (sent < 0) {
- // FIXME: sent == 0 is disconnected? I keep forgetting.
- if (sent == -EAGAIN || sent == -EWOULDBLOCK) {
- // didn't do any writing, wait for a writeable socket.
- _proxy_evthr_evset_be_wrpoll(be, &be->tunables.read_ur);
- } else {
- _reset_bad_backend(be, P_BE_FAIL_WRITING);
- _backend_failed_ur(be);
- }
- }
-
- if (_post_pending_write(be, sent)) {
- // commands were flushed, set read handler.
- _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur);
- }
-
- if (be->io_next) {
- // still have unflushed commands, re-run write command.
- // writev can't "block if EAGAIN" in io_uring so far as I can tell, so
- // we have to switch to polling mode here.
- _proxy_evthr_evset_be_wrpoll(be, &be->tunables.read_ur);
- }
-
- // TODO: if rbufused != 0, push through drive machine?
-}
-
-// No-op at the moment. when the linked timeout fires uring returns the
-// linked request (read/write/poll/etc) with an interrupted/timeout/cancelled
-// error. So we don't need to explicitly handle timeouts.
-// I'm leaving the structure in to simplify the callback routine.
-// Since timeouts rarely get called the extra code here shouldn't matter.
-static void proxy_backend_timeout_handler_ur(void *udata, struct io_uring_cqe *cqe) {
- return;
-}
-
-static void proxy_backend_retry_handler_ur(void *udata, struct io_uring_cqe *cqe) {
- mcp_backend_t *be = udata;
- _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur);
-}
-
-static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) {
- struct io_uring_sqe *sqe;
- if (be->ur_te_ev.set)
- return;
-
- be->ur_te_ev.cb = proxy_backend_retry_handler_ur;
- be->ur_te_ev.udata = be;
-
- sqe = io_uring_get_sqe(&be->event_thread->ring);
- // TODO (v2): NULL?
-
- io_uring_prep_timeout(sqe, &be->tunables.retry_ur, 0, 0);
- io_uring_sqe_set_data(sqe, &be->ur_te_ev);
- be->ur_te_ev.set = true;
-}
-
-static void _backend_failed_ur(mcp_backend_t *be) {
- if (++be->failed_count > be->tunables.backend_failure_limit) {
- P_DEBUG("%s: marking backend as bad\n", __func__);
- be->bad = true;
- _proxy_evthr_evset_be_retry(be);
- STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1);
- } else {
- _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur);
- STAT_INCR(be->event_thread->ctx, backend_failed, 1);
- }
-}
-
-// read handler.
-static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) {
- mcp_backend_t *be = udata;
- int bread = cqe->res;
- // Error or disconnection.
- if (bread <= 0) {
- _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
- _backend_failed_ur(be);
- return;
- }
-
- be->rbufused += bread;
- int res = proxy_backend_drive_machine(be);
-
- if (res != 0) {
- _reset_bad_backend(be, res);
- _backend_failed_ur(be);
- return;
- }
-
- // TODO (v2): when exactly do we need to reset the backend handler?
- if (!STAILQ_EMPTY(&be->io_head)) {
- _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur);
- }
-}
-
-static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
- mcp_backend_t *be = udata;
-
- be->can_write = true;
- _flush_pending_write_ur(be);
-
- _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur);
-}
-
-// a backend with an outstanding new connection has become writeable.
-// check validity.
-// TODO: this gets an error if cancelled right?
-static void proxy_backend_beconn_ur(void *udata, struct io_uring_cqe *cqe) {
- mcp_backend_t *be = udata;
- int err = 0;
- assert(be->connecting);
-/* if (_proxy_beconn_checkconnect(be) == -1) {
- return;
- } */
-
- // We were connecting, now ensure we're properly connected.
- if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) {
- P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port);
- // kick the bad backend, clear the queue, retry later.
- // FIXME (v2): if a connect fails, anything currently in the queue
- // should be safe to hold up until their timeout.
- _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
- _backend_failed_ur(be);
- return;
- }
- P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port);
- be->connecting = false;
- be->state = mcp_backend_read;
- be->bad = false;
- be->failed_count = 0;
-
- be->validating = true;
- // TODO: make validation optional.
-
- if (_beconn_send_validate(be) == -1) {
- _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
- _backend_failed_ur(be);
- return;
- } else {
- // buffer should be empty during validation stage.
- assert(be->rbufused == 0);
- }
-
- // TODO: make validation optional.
- // set next handler on recv for validity check.
- _proxy_evthr_evset_be_readvalidate(be, be->rbuf, READ_BUFFER_SIZE, &be->tunables.read_ur);
-}
-
-// TODO: share more code with proxy_beconn_handler
-static void proxy_backend_beconn_validate_ur(void *udata, struct io_uring_cqe *cqe) {
- mcp_backend_t *be = udata;
- mcmc_resp_t r;
- assert(be->validating);
- assert(cqe->res != -EINVAL);
- P_DEBUG("%s: checking validation: %d\n", __func__, cqe->res);
-
- int bread = cqe->res;
- // Error or disconnection.
- if (bread <= 0) {
- _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
- _backend_failed_ur(be);
- return;
- }
-
- be->rbufused += bread;
-
- int status = mcmc_parse_buf(be->client, be->rbuf, be->rbufused, &r);
- if (status == MCMC_ERR) {
- // Needed more data for a version line, somehow. For the uring code
- // we'll treat that as an error, for now.
- // TODO: re-schedule self if r.code == MCMC_WANT_READ.
-
- _reset_bad_backend(be, P_BE_FAIL_READVALIDATE);
- _backend_failed_ur(be);
- return;
- }
-
- if (r.code != MCMC_CODE_VERSION) {
- _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
- _backend_failed_ur(be);
- return;
- }
-
- be->validating = false;
- be->rbufused = 0;
-
- // Passed validation, don't need to re-read, flush any pending writes.
- _flush_pending_write(be);
-}
-
-// TODO (v3): much code shared with proxy_event_beconn, should be able to
-// abstract out.
-// TODO (v3): further optimization would move the mcmc_connect() socket
-// creation to uring.
-static void proxy_beconn_handler_ur(void *udata, struct io_uring_cqe *cqe) {
- proxy_event_thread_t *t = udata;
- P_DEBUG("%s: got wakeup: %d\n", __func__, cqe->res);
-
- // liburing always uses eventfd for the notifier.
- // *cqe has our result.
- assert(cqe->res != -EINVAL);
- if (cqe->res != sizeof(eventfd_t)) {
- P_DEBUG("%s: cqe->res: %d\n", __func__, cqe->res);
- // FIXME (v2): figure out if this is impossible, and how to handle if not.
- assert(1 == 0);
- }
-
- // need to re-arm the listener every time.
- _proxy_evthr_evset_benotifier(t);
-
- beconn_head_t head;
-
- STAILQ_INIT(&head);
- pthread_mutex_lock(&t->mutex);
- STAILQ_CONCAT(&head, &t->beconn_head_in);
- pthread_mutex_unlock(&t->mutex);
-
- mcp_backend_t *be = NULL;
- // be can be freed by the loop, so can't use STAILQ_FOREACH.
- while (!STAILQ_EMPTY(&head)) {
- be = STAILQ_FIRST(&head);
- STAILQ_REMOVE_HEAD(&head, beconn_next);
- if (be->transferred) {
- // If this object was already transferred here, we're being
- // signalled to clean it up and free.
- _cleanup_backend(be);
- } else {
- be->transferred = true;
- int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
- if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
- // if we're already connected for some reason, still push it
- // through the connection handler to keep the code unified. It
- // will auto-wake because the socket is writeable.
- be->connecting = true;
- be->can_write = false;
- _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur);
- } else {
- _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
- _backend_failed_ur(be);
- }
- }
- }
-
-}
-
-static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) {
- proxy_event_thread_t *t = udata;
-
- // liburing always uses eventfd for the notifier.
- // *cqe has our result.
- assert(cqe->res != -EINVAL);
- if (cqe->res != sizeof(eventfd_t)) {
- P_DEBUG("%s: cqe->res: %d\n", __func__, cqe->res);
- // FIXME (v2): figure out if this is impossible, and how to handle if not.
- assert(1 == 0);
- }
-
- // need to re-arm the listener every time.
- _proxy_evthr_evset_notifier(t);
-
- // TODO (v2): sqe queues for writing to backends
- // - _ur handler for backend write completion is to set a read event and
- // re-submit. ugh.
- // Should be possible to have standing reads, but flow is harder and lets
- // optimize that later. (ie; allow matching reads to a request but don't
- // actually dequeue anything until both read and write are confirmed)
- if (_proxy_event_handler_dequeue(t) == 0) {
- //P_DEBUG("%s: no IO's to complete\n", __func__);
- return;
- }
-
- // Re-walk each backend and check set event as required.
- mcp_backend_t *be = NULL;
-
- // TODO (v2): for each backend, queue writev's into sqe's
- // move the backend sqe bits into a write complete handler
- STAILQ_FOREACH(be, &t->be_head, be_next) {
- be->stacked = false;
-
- if (be->connecting || be->validating) {
- P_DEBUG("%s: deferring IO pending connecting\n", __func__);
- } else {
- _flush_pending_write_ur(be);
- }
- }
-}
-
-static void _proxy_evthr_evset_be_readvalidate(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts) {
- P_DEBUG("%s: setting: %lu\n", __func__, len);
- struct io_uring_sqe *sqe;
- if (be->ur_rd_ev.set) {
- P_DEBUG("%s: already set\n", __func__);
- return;
- }
-
- be->ur_rd_ev.cb = proxy_backend_beconn_validate_ur;
- be->ur_rd_ev.udata = be;
-
- sqe = io_uring_get_sqe(&be->event_thread->ring);
- // FIXME (v2): NULL?
- assert(be->rbuf != NULL);
- io_uring_prep_recv(sqe, mcmc_fd(be->client), buf, len, 0);
- io_uring_sqe_set_data(sqe, &be->ur_rd_ev);
- be->ur_rd_ev.set = true;
-
- sqe->flags |= IOSQE_IO_LINK;
-
- // add a timeout.
- be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
- be->ur_te_ev.udata = be;
- sqe = io_uring_get_sqe(&be->event_thread->ring);
-
- io_uring_prep_link_timeout(sqe, ts, 0);
- io_uring_sqe_set_data(sqe, &be->ur_te_ev);
-}
-
-// reuse the write handler event for pending connections.
-static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timespec *ts) {
- struct io_uring_sqe *sqe;
- P_DEBUG("%s: setting\n", __func__);
- if (be->ur_wr_ev.set)
- return;
-
- be->ur_wr_ev.cb = proxy_backend_beconn_ur;
- be->ur_wr_ev.udata = be;
-
- sqe = io_uring_get_sqe(&be->event_thread->ring);
- // FIXME (v2): NULL?
-
- io_uring_prep_poll_add(sqe, mcmc_fd(be->client), POLLOUT);
- io_uring_sqe_set_data(sqe, &be->ur_wr_ev);
- be->ur_wr_ev.set = true;
-
- sqe->flags |= IOSQE_IO_LINK;
-
- // add a timeout.
- // FIXME: do I need to change this at all?
- be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
- be->ur_te_ev.udata = be;
- sqe = io_uring_get_sqe(&be->event_thread->ring);
-
- io_uring_prep_link_timeout(sqe, ts, 0);
- io_uring_sqe_set_data(sqe, &be->ur_te_ev);
-}
-
-// reusing the ur_wr_ev.
-static void _proxy_evthr_evset_be_writev(mcp_backend_t *be, int iovcnt, struct __kernel_timespec *ts) {
- struct io_uring_sqe *sqe;
- if (be->ur_wr_ev.set)
- return;
-
- be->ur_wr_ev.cb = proxy_backend_postwrite_ur;
- be->ur_wr_ev.udata = be;
-
- sqe = io_uring_get_sqe(&be->event_thread->ring);
- // FIXME (v2): NULL?
-
- if (iovcnt == 1) {
- io_uring_prep_write(sqe, mcmc_fd(be->client), be->write_iovs[0].iov_base, be->write_iovs[0].iov_len, 0);
- } else {
- io_uring_prep_writev(sqe, mcmc_fd(be->client), be->write_iovs, iovcnt, 0);
- }
- io_uring_sqe_set_data(sqe, &be->ur_wr_ev);
- be->ur_wr_ev.set = true;
-
- sqe->flags |= IOSQE_IO_LINK;
-
- // add a timeout.
- be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
- be->ur_te_ev.udata = be;
- sqe = io_uring_get_sqe(&be->event_thread->ring);
-
- io_uring_prep_link_timeout(sqe, ts, 0);
- io_uring_sqe_set_data(sqe, &be->ur_te_ev);
-}
-
-static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts) {
- struct io_uring_sqe *sqe;
- if (be->ur_wr_ev.set)
- return;
-
- be->ur_wr_ev.cb = proxy_backend_wrhandler_ur;
- be->ur_wr_ev.udata = be;
-
- sqe = io_uring_get_sqe(&be->event_thread->ring);
- // FIXME (v2): NULL?
-
- io_uring_prep_poll_add(sqe, mcmc_fd(be->client), POLLOUT);
- io_uring_sqe_set_data(sqe, &be->ur_wr_ev);
- be->ur_wr_ev.set = true;
-
- sqe->flags |= IOSQE_IO_LINK;
-
- // add a timeout.
- be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
- be->ur_te_ev.udata = be;
- sqe = io_uring_get_sqe(&be->event_thread->ring);
-
- io_uring_prep_link_timeout(sqe, ts, 0);
- io_uring_sqe_set_data(sqe, &be->ur_te_ev);
-}
-
-static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts) {
- P_DEBUG("%s: setting: %lu\n", __func__, len);
- struct io_uring_sqe *sqe;
- if (be->ur_rd_ev.set) {
- P_DEBUG("%s: already set\n", __func__);
- return;
- }
-
- be->ur_rd_ev.cb = proxy_backend_handler_ur;
- be->ur_rd_ev.udata = be;
-
- sqe = io_uring_get_sqe(&be->event_thread->ring);
- // FIXME (v2): NULL?
- assert(be->rbuf != NULL);
- io_uring_prep_recv(sqe, mcmc_fd(be->client), buf, len, 0);
- io_uring_sqe_set_data(sqe, &be->ur_rd_ev);
- be->ur_rd_ev.set = true;
-
- sqe->flags |= IOSQE_IO_LINK;
-
- // add a timeout.
- // TODO (v2): we can pre-set the event data and avoid always re-doing it here.
- be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
- be->ur_te_ev.udata = be;
- sqe = io_uring_get_sqe(&be->event_thread->ring);
-
- io_uring_prep_link_timeout(sqe, ts, 0);
- io_uring_sqe_set_data(sqe, &be->ur_te_ev);
-
-}
-
-// FIXME: can this be inside the function?
-//static eventfd_t dummy_event = 1;
-// TODO: in newer versions of uring we can set ignore success?
-/*static void _proxy_evthr_evset_wnotify(proxy_event_thread_t *t, int notify_fd) {
- struct io_uring_sqe *sqe;
-
- sqe = io_uring_get_sqe(&t->ring);
- // FIXME (v2) NULL?
-
- io_uring_prep_write(sqe, notify_fd, &dummy_event, sizeof(dummy_event), 0);
- io_uring_sqe_set_data(sqe, NULL);
-}*/
-
-static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t) {
- struct io_uring_sqe *sqe;
- P_DEBUG("%s: setting: %d\n", __func__, t->ur_benotify_event.set);
- if (t->ur_benotify_event.set)
- return;
-
- t->ur_benotify_event.cb = proxy_beconn_handler_ur;
- t->ur_benotify_event.udata = t;
-
- sqe = io_uring_get_sqe(&t->ring);
- // FIXME (v2): NULL?
- io_uring_prep_read(sqe, t->be_event_fd, &t->beevent_counter, sizeof(eventfd_t), 0);
- io_uring_sqe_set_data(sqe, &t->ur_benotify_event);
-}
-
-static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t) {
- struct io_uring_sqe *sqe;
- P_DEBUG("%s: setting: %d\n", __func__, t->ur_notify_event.set);
- if (t->ur_notify_event.set)
- return;
-
- t->ur_notify_event.cb = proxy_event_handler_ur;
- t->ur_notify_event.udata = t;
-
- sqe = io_uring_get_sqe(&t->ring);
- // FIXME (v2): NULL?
- io_uring_prep_read(sqe, t->event_fd, &t->event_counter, sizeof(eventfd_t), 0);
- io_uring_sqe_set_data(sqe, &t->ur_notify_event);
-}
-
-// TODO (v2): IOURING_FEAT_NODROP: uring_submit() should return -EBUSY if out of CQ
-// events slots. Therefore might starve SQE's if we were low beforehand.
-// - when uring events are armed, they should link into an STAILQ
-// - after all cqe's are processed from the loop, walk the queued events
-// - generate SQE's as necessary, bailing if we run out before running out of
-// events.
-// - submit the SQE's
-// - if it bails on -EBUSY due to too many CQE's, run the CQE loop again
-// - submit if there were pending SQE's before resuming walking the event
-// chain.
-//
-// Think this is the best compromise; doesn't use temporary memory for
-// processing CQE's, and we already have dedicated memory for the SQE side of
-// things so adding a little more for an STAILQ is fine.
-// Until then this code will deadlock and die if -EBUSY happens.
-void *proxy_event_thread_ur(void *arg) {
- proxy_event_thread_t *t = arg;
- struct io_uring_cqe *cqe;
-
- P_DEBUG("%s: starting\n", __func__);
-
- logger_create(); // TODO (v2): add logger to struct
- while (1) {
- P_DEBUG("%s: submit and wait\n", __func__);
- io_uring_submit_and_wait(&t->ring, 1);
- //P_DEBUG("%s: sqe submitted: %d\n", __func__, ret);
-
- uint32_t head = 0;
- uint32_t count = 0;
-
- io_uring_for_each_cqe(&t->ring, head, cqe) {
- P_DEBUG("%s: got a CQE [count:%d]\n", __func__, count);
-
- proxy_event_t *pe = io_uring_cqe_get_data(cqe);
- if (pe != NULL) {
- pe->set = false;
- pe->cb(pe->udata, cqe);
- }
-
- count++;
- }
-
- P_DEBUG("%s: advancing [count:%d]\n", __func__, count);
- io_uring_cq_advance(&t->ring, count);
- }
-
- return NULL;
-}
-#endif // HAVE_LIBURING
-
static void _cleanup_backend(mcp_backend_t *be) {
-#ifdef HAVE_LIBURING
- if (be->event_thread->use_uring) {
- // TODO: cancel any live uring events.
- } else {
-#endif
// remove any pending events.
int pending = event_pending(&be->main_event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL);
if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) {
@@ -736,9 +174,6 @@ static void _cleanup_backend(mcp_backend_t *be) {
if ((pending & (EV_TIMEOUT)) != 0) {
event_del(&be->timeout_event); // an error to call event_del() without event.
}
-#ifdef HAVE_LIBURING
- }
-#endif
// - assert on empty queue
assert(STAILQ_EMPTY(&be->io_head));
@@ -1667,64 +1102,9 @@ void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct e
// initialize the event system.
#ifdef HAVE_LIBURING
- fprintf(stderr, "Sorry, io_uring not supported right now\n");
- abort();
- bool use_uring = t->ctx->use_uring;
- struct io_uring_params p = {0};
- assert(t->event_fd); // uring only exists where eventfd also does.
-
- // Setup the CQSIZE to be much larger than SQ size, since backpressure
- // issues can cause us to block on SQ submissions and as a network server,
- // stuff happens.
-
- if (use_uring) {
- p.flags = IORING_SETUP_CQSIZE;
- p.cq_entries = PRING_QUEUE_CQ_ENTRIES;
- int ret = io_uring_queue_init_params(PRING_QUEUE_SQ_ENTRIES, &t->ring, &p);
- if (ret) {
- perror("io_uring_queue_init_params");
- exit(1);
- }
- if (!(p.features & IORING_FEAT_NODROP)) {
- fprintf(stderr, "uring: kernel missing IORING_FEAT_NODROP, using libevent\n");
- use_uring = false;
- }
- if (!(p.features & IORING_FEAT_SINGLE_MMAP)) {
- fprintf(stderr, "uring: kernel missing IORING_FEAT_SINGLE_MMAP, using libevent\n");
- use_uring = false;
- }
- if (!(p.features & IORING_FEAT_FAST_POLL)) {
- fprintf(stderr, "uring: kernel missing IORING_FEAT_FAST_POLL, using libevent\n");
- use_uring = false;
- }
-
- if (use_uring) {
- // FIXME (v2): Sigh. we need a blocking event_fd for io_uring but we've a
- // chicken and egg in here. need a better structure... in meantime
- // re-create the event_fd.
-
- // set the new request handler.
- close(t->event_fd);
- t->event_fd = eventfd(0, 0);
- // FIXME (v2): hack for event init.
- t->ur_notify_event.set = false;
- _proxy_evthr_evset_notifier(t);
-
- // set the new backend connection handler.
- close(t->be_event_fd);
- t->be_event_fd = eventfd(0, 0);
- t->ur_benotify_event.set = false;
- _proxy_evthr_evset_benotifier(t);
-
- t->use_uring = true;
- return;
- } else {
- // Decided to not use io_uring, so don't waste memory.
- t->use_uring = false;
- io_uring_queue_exit(&t->ring);
- }
- } else {
- t->use_uring = false;
+ if (t->ctx->use_uring) {
+ fprintf(stderr, "Sorry, io_uring not supported right now\n");
+ abort();
}
#endif