summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-03-13 21:47:54 -0700
committerdormando <dormando@rydia.net>2023-03-26 16:48:37 -0700
commit595758ad02ac3ab57ae981c9d736563812861eaf (patch)
treeaec2940c0794667c1c1dfb3c17ad3f6074f2c2dd
parent6d767eb78ae256375cfe23a0a91ea6e12e046d6b (diff)
downloadmemcached-595758ad02ac3ab57ae981c9d736563812861eaf.tar.gz
proxy: rip out io_uring code
with the event handler rewrite the IO thread scales much better (up to 8-12 worker threads), leaving the io_uring code in the dust. realistically io_uring won't be able to beat the event code if you're using kernels older than 6.2, which is brand new. Instead of carrying all this code around and having people randomly try it to get more performance, I want to rip it out of the way and add it back in later when it makes sense. I am using mcshredder as a platform to learn and keep up to date with io_uring, and will port over its usage pattern when it's time.
-rw-r--r--memcached.c3
-rw-r--r--proto_proxy.c13
-rw-r--r--proxy_lua.c24
-rw-r--r--proxy_network.c626
4 files changed, 3 insertions, 663 deletions
diff --git a/memcached.c b/memcached.c
index 8e8a180..306a952 100644
--- a/memcached.c
+++ b/memcached.c
@@ -4158,9 +4158,6 @@ static void usage(void) {
#endif
#ifdef PROXY
printf(" - proxy_config: path to lua config file.\n");
-#ifdef HAVE_LIBURING
- printf(" - proxy_uring: enable IO_URING for proxy backends.\n");
-#endif
#endif
#ifdef TLS
printf(" - ssl_chain_cert: certificate chain file in PEM format\n"
diff --git a/proto_proxy.c b/proto_proxy.c
index 9a6736b..e99f893 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -154,11 +154,6 @@ void *proxy_init(bool use_uring) {
ctx->tunables.connect.tv_sec = 5;
ctx->tunables.retry.tv_sec = 3;
ctx->tunables.read.tv_sec = 3;
-#ifdef HAVE_LIBURING
- ctx->tunables.connect_ur.tv_sec = 5;
- ctx->tunables.retry_ur.tv_sec = 3;
- ctx->tunables.read_ur.tv_sec = 3;
-#endif // HAVE_LIBURING
STAILQ_INIT(&ctx->manager_head);
lua_State *L = luaL_newstate();
@@ -173,15 +168,7 @@ void *proxy_init(bool use_uring) {
ctx->proxy_io_thread = t;
proxy_init_event_thread(t, ctx, NULL);
-#ifdef HAVE_LIBURING
- if (t->use_uring) {
- pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t);
- } else {
- pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
- }
-#else
pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
-#endif // HAVE_LIBURING
thread_setname(t->thread_id, "mc-prx-io");
_start_proxy_config_threads(ctx);
diff --git a/proxy_lua.c b/proxy_lua.c
index 3297d9f..85de700 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -220,10 +220,6 @@ static int mcplib_backend(lua_State *L) {
be->tunables.connect.tv_sec = secondsi;
be->tunables.connect.tv_usec = MICROSECONDS(subseconds);
-#ifdef HAVE_LIBURING
- be->tunables.connect_ur.tv_sec = secondsi;
- be->tunables.connect_ur.tv_nsec = NANOSECONDS(subseconds);
-#endif
}
lua_pop(L, 1);
@@ -234,10 +230,6 @@ static int mcplib_backend(lua_State *L) {
be->tunables.retry.tv_sec = secondsi;
be->tunables.retry.tv_usec = MICROSECONDS(subseconds);
-#ifdef HAVE_LIBURING
- be->tunables.retry_ur.tv_sec = secondsi;
- be->tunables.retry_ur.tv_nsec = NANOSECONDS(subseconds);
-#endif
}
lua_pop(L, 1);
@@ -248,10 +240,6 @@ static int mcplib_backend(lua_State *L) {
be->tunables.read.tv_sec = secondsi;
be->tunables.read.tv_usec = MICROSECONDS(subseconds);
-#ifdef HAVE_LIBURING
- be->tunables.read_ur.tv_sec = secondsi;
- be->tunables.read_ur.tv_nsec = NANOSECONDS(subseconds);
-#endif
}
lua_pop(L, 1);
@@ -831,10 +819,6 @@ static int mcplib_backend_connect_timeout(lua_State *L) {
STAT_L(ctx);
ctx->tunables.connect.tv_sec = secondsi;
ctx->tunables.connect.tv_usec = MICROSECONDS(subseconds);
-#ifdef HAVE_LIBURING
- ctx->tunables.connect_ur.tv_sec = secondsi;
- ctx->tunables.connect_ur.tv_nsec = NANOSECONDS(subseconds);
-#endif
STAT_UL(ctx);
return 0;
@@ -849,10 +833,6 @@ static int mcplib_backend_retry_timeout(lua_State *L) {
STAT_L(ctx);
ctx->tunables.retry.tv_sec = secondsi;
ctx->tunables.retry.tv_usec = MICROSECONDS(subseconds);
-#ifdef HAVE_LIBURING
- ctx->tunables.retry_ur.tv_sec = secondsi;
- ctx->tunables.retry_ur.tv_nsec = NANOSECONDS(subseconds);
-#endif
STAT_UL(ctx);
return 0;
@@ -867,10 +847,6 @@ static int mcplib_backend_read_timeout(lua_State *L) {
STAT_L(ctx);
ctx->tunables.read.tv_sec = secondsi;
ctx->tunables.read.tv_usec = MICROSECONDS(subseconds);
-#ifdef HAVE_LIBURING
- ctx->tunables.read_ur.tv_sec = secondsi;
- ctx->tunables.read_ur.tv_nsec = NANOSECONDS(subseconds);
-#endif
STAT_UL(ctx);
return 0;
diff --git a/proxy_network.c b/proxy_network.c
index 1d9db29..ca56baa 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -160,569 +160,7 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
return io_count;
}
-#ifdef HAVE_LIBURING
-//static void _proxy_evthr_evset_wnotify(proxy_event_thread_t *t, int notify_fd);
-static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts);
-static void _proxy_evthr_evset_be_writev(mcp_backend_t *be, int iovcnt, struct __kernel_timespec *ts);
-static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts);
-static void _proxy_evthr_evset_be_retry(mcp_backend_t *be);
-static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timespec *ts);
-static void _proxy_evthr_evset_be_readvalidate(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts);
-static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t);
-static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t);
-static void _backend_failed_ur(mcp_backend_t *be);
-
-static void _flush_pending_write_ur(mcp_backend_t *be) {
- // Allow us to be called with an empty stack to prevent dev errors.
- if (STAILQ_EMPTY(&be->io_head)) {
- return;
- }
-
- int iovcnt = _prep_pending_write(be);
-
- // TODO: write timeout.
- _proxy_evthr_evset_be_writev(be, iovcnt, &be->tunables.read_ur);
-}
-
-// TODO: we shouldn't handle reads if a write is pending, so postwrite should
-// check for pending read data before going into read mode.
-// need be->writing flag to toggle?
-static void proxy_backend_postwrite_ur(void *udata, struct io_uring_cqe *cqe) {
- mcp_backend_t *be = udata;
- P_DEBUG("%s: %d\n", __func__, cqe->res);
- assert(cqe->res != -EINVAL);
- int sent = cqe->res;
- if (sent < 0) {
- // FIXME: sent == 0 is disconnected? I keep forgetting.
- if (sent == -EAGAIN || sent == -EWOULDBLOCK) {
- // didn't do any writing, wait for a writeable socket.
- _proxy_evthr_evset_be_wrpoll(be, &be->tunables.read_ur);
- } else {
- _reset_bad_backend(be, P_BE_FAIL_WRITING);
- _backend_failed_ur(be);
- }
- }
-
- if (_post_pending_write(be, sent)) {
- // commands were flushed, set read handler.
- _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur);
- }
-
- if (be->io_next) {
- // still have unflushed commands, re-run write command.
- // writev can't "block if EAGAIN" in io_uring so far as I can tell, so
- // we have to switch to polling mode here.
- _proxy_evthr_evset_be_wrpoll(be, &be->tunables.read_ur);
- }
-
- // TODO: if rbufused != 0, push through drive machine?
-}
-
-// No-op at the moment. when the linked timeout fires uring returns the
-// linked request (read/write/poll/etc) with an interrupted/timeout/cancelled
-// error. So we don't need to explicitly handle timeouts.
-// I'm leaving the structure in to simplify the callback routine.
-// Since timeouts rarely get called the extra code here shouldn't matter.
-static void proxy_backend_timeout_handler_ur(void *udata, struct io_uring_cqe *cqe) {
- return;
-}
-
-static void proxy_backend_retry_handler_ur(void *udata, struct io_uring_cqe *cqe) {
- mcp_backend_t *be = udata;
- _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur);
-}
-
-static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) {
- struct io_uring_sqe *sqe;
- if (be->ur_te_ev.set)
- return;
-
- be->ur_te_ev.cb = proxy_backend_retry_handler_ur;
- be->ur_te_ev.udata = be;
-
- sqe = io_uring_get_sqe(&be->event_thread->ring);
- // TODO (v2): NULL?
-
- io_uring_prep_timeout(sqe, &be->tunables.retry_ur, 0, 0);
- io_uring_sqe_set_data(sqe, &be->ur_te_ev);
- be->ur_te_ev.set = true;
-}
-
-static void _backend_failed_ur(mcp_backend_t *be) {
- if (++be->failed_count > be->tunables.backend_failure_limit) {
- P_DEBUG("%s: marking backend as bad\n", __func__);
- be->bad = true;
- _proxy_evthr_evset_be_retry(be);
- STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1);
- } else {
- _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur);
- STAT_INCR(be->event_thread->ctx, backend_failed, 1);
- }
-}
-
-// read handler.
-static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) {
- mcp_backend_t *be = udata;
- int bread = cqe->res;
- // Error or disconnection.
- if (bread <= 0) {
- _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
- _backend_failed_ur(be);
- return;
- }
-
- be->rbufused += bread;
- int res = proxy_backend_drive_machine(be);
-
- if (res != 0) {
- _reset_bad_backend(be, res);
- _backend_failed_ur(be);
- return;
- }
-
- // TODO (v2): when exactly do we need to reset the backend handler?
- if (!STAILQ_EMPTY(&be->io_head)) {
- _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur);
- }
-}
-
-static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
- mcp_backend_t *be = udata;
-
- be->can_write = true;
- _flush_pending_write_ur(be);
-
- _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur);
-}
-
-// a backend with an outstanding new connection has become writeable.
-// check validity.
-// TODO: this gets an error if cancelled right?
-static void proxy_backend_beconn_ur(void *udata, struct io_uring_cqe *cqe) {
- mcp_backend_t *be = udata;
- int err = 0;
- assert(be->connecting);
-/* if (_proxy_beconn_checkconnect(be) == -1) {
- return;
- } */
-
- // We were connecting, now ensure we're properly connected.
- if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) {
- P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port);
- // kick the bad backend, clear the queue, retry later.
- // FIXME (v2): if a connect fails, anything currently in the queue
- // should be safe to hold up until their timeout.
- _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
- _backend_failed_ur(be);
- return;
- }
- P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port);
- be->connecting = false;
- be->state = mcp_backend_read;
- be->bad = false;
- be->failed_count = 0;
-
- be->validating = true;
- // TODO: make validation optional.
-
- if (_beconn_send_validate(be) == -1) {
- _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
- _backend_failed_ur(be);
- return;
- } else {
- // buffer should be empty during validation stage.
- assert(be->rbufused == 0);
- }
-
- // TODO: make validation optional.
- // set next handler on recv for validity check.
- _proxy_evthr_evset_be_readvalidate(be, be->rbuf, READ_BUFFER_SIZE, &be->tunables.read_ur);
-}
-
-// TODO: share more code with proxy_beconn_handler
-static void proxy_backend_beconn_validate_ur(void *udata, struct io_uring_cqe *cqe) {
- mcp_backend_t *be = udata;
- mcmc_resp_t r;
- assert(be->validating);
- assert(cqe->res != -EINVAL);
- P_DEBUG("%s: checking validation: %d\n", __func__, cqe->res);
-
- int bread = cqe->res;
- // Error or disconnection.
- if (bread <= 0) {
- _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
- _backend_failed_ur(be);
- return;
- }
-
- be->rbufused += bread;
-
- int status = mcmc_parse_buf(be->client, be->rbuf, be->rbufused, &r);
- if (status == MCMC_ERR) {
- // Needed more data for a version line, somehow. For the uring code
- // we'll treat that as an error, for now.
- // TODO: re-schedule self if r.code == MCMC_WANT_READ.
-
- _reset_bad_backend(be, P_BE_FAIL_READVALIDATE);
- _backend_failed_ur(be);
- return;
- }
-
- if (r.code != MCMC_CODE_VERSION) {
- _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
- _backend_failed_ur(be);
- return;
- }
-
- be->validating = false;
- be->rbufused = 0;
-
- // Passed validation, don't need to re-read, flush any pending writes.
- _flush_pending_write(be);
-}
-
-// TODO (v3): much code shared with proxy_event_beconn, should be able to
-// abstract out.
-// TODO (v3): further optimization would move the mcmc_connect() socket
-// creation to uring.
-static void proxy_beconn_handler_ur(void *udata, struct io_uring_cqe *cqe) {
- proxy_event_thread_t *t = udata;
- P_DEBUG("%s: got wakeup: %d\n", __func__, cqe->res);
-
- // liburing always uses eventfd for the notifier.
- // *cqe has our result.
- assert(cqe->res != -EINVAL);
- if (cqe->res != sizeof(eventfd_t)) {
- P_DEBUG("%s: cqe->res: %d\n", __func__, cqe->res);
- // FIXME (v2): figure out if this is impossible, and how to handle if not.
- assert(1 == 0);
- }
-
- // need to re-arm the listener every time.
- _proxy_evthr_evset_benotifier(t);
-
- beconn_head_t head;
-
- STAILQ_INIT(&head);
- pthread_mutex_lock(&t->mutex);
- STAILQ_CONCAT(&head, &t->beconn_head_in);
- pthread_mutex_unlock(&t->mutex);
-
- mcp_backend_t *be = NULL;
- // be can be freed by the loop, so can't use STAILQ_FOREACH.
- while (!STAILQ_EMPTY(&head)) {
- be = STAILQ_FIRST(&head);
- STAILQ_REMOVE_HEAD(&head, beconn_next);
- if (be->transferred) {
- // If this object was already transferred here, we're being
- // signalled to clean it up and free.
- _cleanup_backend(be);
- } else {
- be->transferred = true;
- int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
- if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
- // if we're already connected for some reason, still push it
- // through the connection handler to keep the code unified. It
- // will auto-wake because the socket is writeable.
- be->connecting = true;
- be->can_write = false;
- _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur);
- } else {
- _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
- _backend_failed_ur(be);
- }
- }
- }
-
-}
-
-static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) {
- proxy_event_thread_t *t = udata;
-
- // liburing always uses eventfd for the notifier.
- // *cqe has our result.
- assert(cqe->res != -EINVAL);
- if (cqe->res != sizeof(eventfd_t)) {
- P_DEBUG("%s: cqe->res: %d\n", __func__, cqe->res);
- // FIXME (v2): figure out if this is impossible, and how to handle if not.
- assert(1 == 0);
- }
-
- // need to re-arm the listener every time.
- _proxy_evthr_evset_notifier(t);
-
- // TODO (v2): sqe queues for writing to backends
- // - _ur handler for backend write completion is to set a read event and
- // re-submit. ugh.
- // Should be possible to have standing reads, but flow is harder and lets
- // optimize that later. (ie; allow matching reads to a request but don't
- // actually dequeue anything until both read and write are confirmed)
- if (_proxy_event_handler_dequeue(t) == 0) {
- //P_DEBUG("%s: no IO's to complete\n", __func__);
- return;
- }
-
- // Re-walk each backend and check set event as required.
- mcp_backend_t *be = NULL;
-
- // TODO (v2): for each backend, queue writev's into sqe's
- // move the backend sqe bits into a write complete handler
- STAILQ_FOREACH(be, &t->be_head, be_next) {
- be->stacked = false;
-
- if (be->connecting || be->validating) {
- P_DEBUG("%s: deferring IO pending connecting\n", __func__);
- } else {
- _flush_pending_write_ur(be);
- }
- }
-}
-
-static void _proxy_evthr_evset_be_readvalidate(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts) {
- P_DEBUG("%s: setting: %lu\n", __func__, len);
- struct io_uring_sqe *sqe;
- if (be->ur_rd_ev.set) {
- P_DEBUG("%s: already set\n", __func__);
- return;
- }
-
- be->ur_rd_ev.cb = proxy_backend_beconn_validate_ur;
- be->ur_rd_ev.udata = be;
-
- sqe = io_uring_get_sqe(&be->event_thread->ring);
- // FIXME (v2): NULL?
- assert(be->rbuf != NULL);
- io_uring_prep_recv(sqe, mcmc_fd(be->client), buf, len, 0);
- io_uring_sqe_set_data(sqe, &be->ur_rd_ev);
- be->ur_rd_ev.set = true;
-
- sqe->flags |= IOSQE_IO_LINK;
-
- // add a timeout.
- be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
- be->ur_te_ev.udata = be;
- sqe = io_uring_get_sqe(&be->event_thread->ring);
-
- io_uring_prep_link_timeout(sqe, ts, 0);
- io_uring_sqe_set_data(sqe, &be->ur_te_ev);
-}
-
-// reuse the write handler event for pending connections.
-static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timespec *ts) {
- struct io_uring_sqe *sqe;
- P_DEBUG("%s: setting\n", __func__);
- if (be->ur_wr_ev.set)
- return;
-
- be->ur_wr_ev.cb = proxy_backend_beconn_ur;
- be->ur_wr_ev.udata = be;
-
- sqe = io_uring_get_sqe(&be->event_thread->ring);
- // FIXME (v2): NULL?
-
- io_uring_prep_poll_add(sqe, mcmc_fd(be->client), POLLOUT);
- io_uring_sqe_set_data(sqe, &be->ur_wr_ev);
- be->ur_wr_ev.set = true;
-
- sqe->flags |= IOSQE_IO_LINK;
-
- // add a timeout.
- // FIXME: do I need to change this at all?
- be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
- be->ur_te_ev.udata = be;
- sqe = io_uring_get_sqe(&be->event_thread->ring);
-
- io_uring_prep_link_timeout(sqe, ts, 0);
- io_uring_sqe_set_data(sqe, &be->ur_te_ev);
-}
-
-// reusing the ur_wr_ev.
-static void _proxy_evthr_evset_be_writev(mcp_backend_t *be, int iovcnt, struct __kernel_timespec *ts) {
- struct io_uring_sqe *sqe;
- if (be->ur_wr_ev.set)
- return;
-
- be->ur_wr_ev.cb = proxy_backend_postwrite_ur;
- be->ur_wr_ev.udata = be;
-
- sqe = io_uring_get_sqe(&be->event_thread->ring);
- // FIXME (v2): NULL?
-
- if (iovcnt == 1) {
- io_uring_prep_write(sqe, mcmc_fd(be->client), be->write_iovs[0].iov_base, be->write_iovs[0].iov_len, 0);
- } else {
- io_uring_prep_writev(sqe, mcmc_fd(be->client), be->write_iovs, iovcnt, 0);
- }
- io_uring_sqe_set_data(sqe, &be->ur_wr_ev);
- be->ur_wr_ev.set = true;
-
- sqe->flags |= IOSQE_IO_LINK;
-
- // add a timeout.
- be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
- be->ur_te_ev.udata = be;
- sqe = io_uring_get_sqe(&be->event_thread->ring);
-
- io_uring_prep_link_timeout(sqe, ts, 0);
- io_uring_sqe_set_data(sqe, &be->ur_te_ev);
-}
-
-static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts) {
- struct io_uring_sqe *sqe;
- if (be->ur_wr_ev.set)
- return;
-
- be->ur_wr_ev.cb = proxy_backend_wrhandler_ur;
- be->ur_wr_ev.udata = be;
-
- sqe = io_uring_get_sqe(&be->event_thread->ring);
- // FIXME (v2): NULL?
-
- io_uring_prep_poll_add(sqe, mcmc_fd(be->client), POLLOUT);
- io_uring_sqe_set_data(sqe, &be->ur_wr_ev);
- be->ur_wr_ev.set = true;
-
- sqe->flags |= IOSQE_IO_LINK;
-
- // add a timeout.
- be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
- be->ur_te_ev.udata = be;
- sqe = io_uring_get_sqe(&be->event_thread->ring);
-
- io_uring_prep_link_timeout(sqe, ts, 0);
- io_uring_sqe_set_data(sqe, &be->ur_te_ev);
-}
-
-static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts) {
- P_DEBUG("%s: setting: %lu\n", __func__, len);
- struct io_uring_sqe *sqe;
- if (be->ur_rd_ev.set) {
- P_DEBUG("%s: already set\n", __func__);
- return;
- }
-
- be->ur_rd_ev.cb = proxy_backend_handler_ur;
- be->ur_rd_ev.udata = be;
-
- sqe = io_uring_get_sqe(&be->event_thread->ring);
- // FIXME (v2): NULL?
- assert(be->rbuf != NULL);
- io_uring_prep_recv(sqe, mcmc_fd(be->client), buf, len, 0);
- io_uring_sqe_set_data(sqe, &be->ur_rd_ev);
- be->ur_rd_ev.set = true;
-
- sqe->flags |= IOSQE_IO_LINK;
-
- // add a timeout.
- // TODO (v2): we can pre-set the event data and avoid always re-doing it here.
- be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
- be->ur_te_ev.udata = be;
- sqe = io_uring_get_sqe(&be->event_thread->ring);
-
- io_uring_prep_link_timeout(sqe, ts, 0);
- io_uring_sqe_set_data(sqe, &be->ur_te_ev);
-
-}
-
-// FIXME: can this be inside the function?
-//static eventfd_t dummy_event = 1;
-// TODO: in newer versions of uring we can set ignore success?
-/*static void _proxy_evthr_evset_wnotify(proxy_event_thread_t *t, int notify_fd) {
- struct io_uring_sqe *sqe;
-
- sqe = io_uring_get_sqe(&t->ring);
- // FIXME (v2) NULL?
-
- io_uring_prep_write(sqe, notify_fd, &dummy_event, sizeof(dummy_event), 0);
- io_uring_sqe_set_data(sqe, NULL);
-}*/
-
-static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t) {
- struct io_uring_sqe *sqe;
- P_DEBUG("%s: setting: %d\n", __func__, t->ur_benotify_event.set);
- if (t->ur_benotify_event.set)
- return;
-
- t->ur_benotify_event.cb = proxy_beconn_handler_ur;
- t->ur_benotify_event.udata = t;
-
- sqe = io_uring_get_sqe(&t->ring);
- // FIXME (v2): NULL?
- io_uring_prep_read(sqe, t->be_event_fd, &t->beevent_counter, sizeof(eventfd_t), 0);
- io_uring_sqe_set_data(sqe, &t->ur_benotify_event);
-}
-
-static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t) {
- struct io_uring_sqe *sqe;
- P_DEBUG("%s: setting: %d\n", __func__, t->ur_notify_event.set);
- if (t->ur_notify_event.set)
- return;
-
- t->ur_notify_event.cb = proxy_event_handler_ur;
- t->ur_notify_event.udata = t;
-
- sqe = io_uring_get_sqe(&t->ring);
- // FIXME (v2): NULL?
- io_uring_prep_read(sqe, t->event_fd, &t->event_counter, sizeof(eventfd_t), 0);
- io_uring_sqe_set_data(sqe, &t->ur_notify_event);
-}
-
-// TODO (v2): IOURING_FEAT_NODROP: uring_submit() should return -EBUSY if out of CQ
-// events slots. Therefore might starve SQE's if we were low beforehand.
-// - when uring events are armed, they should link into an STAILQ
-// - after all cqe's are processed from the loop, walk the queued events
-// - generate SQE's as necessary, bailing if we run out before running out of
-// events.
-// - submit the SQE's
-// - if it bails on -EBUSY due to too many CQE's, run the CQE loop again
-// - submit if there were pending SQE's before resuming walking the event
-// chain.
-//
-// Think this is the best compromise; doesn't use temporary memory for
-// processing CQE's, and we already have dedicated memory for the SQE side of
-// things so adding a little more for an STAILQ is fine.
-// Until then this code will deadlock and die if -EBUSY happens.
-void *proxy_event_thread_ur(void *arg) {
- proxy_event_thread_t *t = arg;
- struct io_uring_cqe *cqe;
-
- P_DEBUG("%s: starting\n", __func__);
-
- logger_create(); // TODO (v2): add logger to struct
- while (1) {
- P_DEBUG("%s: submit and wait\n", __func__);
- io_uring_submit_and_wait(&t->ring, 1);
- //P_DEBUG("%s: sqe submitted: %d\n", __func__, ret);
-
- uint32_t head = 0;
- uint32_t count = 0;
-
- io_uring_for_each_cqe(&t->ring, head, cqe) {
- P_DEBUG("%s: got a CQE [count:%d]\n", __func__, count);
-
- proxy_event_t *pe = io_uring_cqe_get_data(cqe);
- if (pe != NULL) {
- pe->set = false;
- pe->cb(pe->udata, cqe);
- }
-
- count++;
- }
-
- P_DEBUG("%s: advancing [count:%d]\n", __func__, count);
- io_uring_cq_advance(&t->ring, count);
- }
-
- return NULL;
-}
-#endif // HAVE_LIBURING
-
static void _cleanup_backend(mcp_backend_t *be) {
-#ifdef HAVE_LIBURING
- if (be->event_thread->use_uring) {
- // TODO: cancel any live uring events.
- } else {
-#endif
// remove any pending events.
int pending = event_pending(&be->main_event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL);
if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) {
@@ -736,9 +174,6 @@ static void _cleanup_backend(mcp_backend_t *be) {
if ((pending & (EV_TIMEOUT)) != 0) {
event_del(&be->timeout_event); // an error to call event_del() without event.
}
-#ifdef HAVE_LIBURING
- }
-#endif
// - assert on empty queue
assert(STAILQ_EMPTY(&be->io_head));
@@ -1667,64 +1102,9 @@ void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct e
// initialize the event system.
#ifdef HAVE_LIBURING
- fprintf(stderr, "Sorry, io_uring not supported right now\n");
- abort();
- bool use_uring = t->ctx->use_uring;
- struct io_uring_params p = {0};
- assert(t->event_fd); // uring only exists where eventfd also does.
-
- // Setup the CQSIZE to be much larger than SQ size, since backpressure
- // issues can cause us to block on SQ submissions and as a network server,
- // stuff happens.
-
- if (use_uring) {
- p.flags = IORING_SETUP_CQSIZE;
- p.cq_entries = PRING_QUEUE_CQ_ENTRIES;
- int ret = io_uring_queue_init_params(PRING_QUEUE_SQ_ENTRIES, &t->ring, &p);
- if (ret) {
- perror("io_uring_queue_init_params");
- exit(1);
- }
- if (!(p.features & IORING_FEAT_NODROP)) {
- fprintf(stderr, "uring: kernel missing IORING_FEAT_NODROP, using libevent\n");
- use_uring = false;
- }
- if (!(p.features & IORING_FEAT_SINGLE_MMAP)) {
- fprintf(stderr, "uring: kernel missing IORING_FEAT_SINGLE_MMAP, using libevent\n");
- use_uring = false;
- }
- if (!(p.features & IORING_FEAT_FAST_POLL)) {
- fprintf(stderr, "uring: kernel missing IORING_FEAT_FAST_POLL, using libevent\n");
- use_uring = false;
- }
-
- if (use_uring) {
- // FIXME (v2): Sigh. we need a blocking event_fd for io_uring but we've a
- // chicken and egg in here. need a better structure... in meantime
- // re-create the event_fd.
-
- // set the new request handler.
- close(t->event_fd);
- t->event_fd = eventfd(0, 0);
- // FIXME (v2): hack for event init.
- t->ur_notify_event.set = false;
- _proxy_evthr_evset_notifier(t);
-
- // set the new backend connection handler.
- close(t->be_event_fd);
- t->be_event_fd = eventfd(0, 0);
- t->ur_benotify_event.set = false;
- _proxy_evthr_evset_benotifier(t);
-
- t->use_uring = true;
- return;
- } else {
- // Decided to not use io_uring, so don't waste memory.
- t->use_uring = false;
- io_uring_queue_exit(&t->ring);
- }
- } else {
- t->use_uring = false;
+ if (t->ctx->use_uring) {
+ fprintf(stderr, "Sorry, io_uring not supported right now\n");
+ abort();
}
#endif