summaryrefslogtreecommitdiff
path: root/proxy_network.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-12-13 12:48:14 -0800
committerdormando <dormando@rydia.net>2022-12-13 12:48:14 -0800
commit8b98647d1ad80dee3ddb49dfc51cd7bd07fe9297 (patch)
tree3a12dd2af7f5de130a1fd0034d80716416c7685b /proxy_network.c
parentd401611ba88db17c38fedf97d336f8085ce24bab (diff)
downloadmemcached-8b98647d1ad80dee3ddb49dfc51cd7bd07fe9297.tar.gz
proxy: make io_uring backend work again
updates the io_uring code to match the updates on the libevent side. needs more work before merge: - auditing error conditions - try harder for some code deduplication
Diffstat (limited to 'proxy_network.c')
-rw-r--r--proxy_network.c368
1 files changed, 311 insertions, 57 deletions
diff --git a/proxy_network.c b/proxy_network.c
index 000666d..8ce4bab 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -42,11 +42,69 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg);
static void proxy_event_updater(evutil_socket_t fd, short which, void *arg);
static int _flush_pending_write(mcp_backend_t *be);
+static void _cleanup_backend(mcp_backend_t *be);
static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err);
static void _backend_failed(mcp_backend_t *be);
static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback);
static int proxy_backend_drive_machine(mcp_backend_t *be);
+/* Helper routines common to io_uring and libevent modes */
+
+// TODO (v3): doing an inline syscall here, not ideal for uring mode.
+// leaving for now since this should be extremely uncommon.
+static int _beconn_send_validate(mcp_backend_t *be) {
+ const char *str = "version\r\n";
+ const ssize_t len = strlen(str);
+
+ ssize_t res = write(mcmc_fd(be->client), str, len);
+
+ if (res == -1) {
+ return -1;
+ }
+
+ // I'm making an opinionated statement that we should be able to write
+ // "version\r\n" into a fresh socket without hitting EAGAIN.
+ if (res < len) {
+ return -1;
+ }
+
+ return 1;
+}
+
+// FIXME: make _backend_failed conditionally use _ur() so we can have one call
+// in the code and reuse more code like this.
+static int _proxy_beconn_checkconnect(mcp_backend_t *be) {
+ int err = 0;
+ // We were connecting, now ensure we're properly connected.
+ if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) {
+ P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port);
+ // kick the bad backend, clear the queue, retry later.
+ // FIXME (v2): if a connect fails, anything currently in the queue
+ // should be safe to hold up until their timeout.
+ _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
+ _backend_failed(be);
+ return -1;
+ }
+ P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port);
+ be->connecting = false;
+ be->state = mcp_backend_read;
+ be->bad = false;
+ be->failed_count = 0;
+
+ be->validating = true;
+ // TODO: make validation optional.
+
+ if (_beconn_send_validate(be) == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
+ _backend_failed(be);
+ return -1;
+ } else {
+ // buffer should be empty during validation stage.
+ assert(be->rbufused == 0);
+ return 0;
+ }
+}
+
static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
io_head_t head;
@@ -108,7 +166,10 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts);
static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts);
static void _proxy_evthr_evset_be_retry(mcp_backend_t *be);
+static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timespec *ts);
+static void _proxy_evthr_evset_be_readvalidate(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts);
static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t);
+static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t);
static void _proxy_evthr_evset_clock(proxy_event_thread_t *t);
static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe);
struct __kernel_timespec updater_ts = {.tv_sec = 3, .tv_nsec = 0};
@@ -136,7 +197,7 @@ static void proxy_backend_timeout_handler_ur(void *udata, struct io_uring_cqe *c
static void proxy_backend_retry_handler_ur(void *udata, struct io_uring_cqe *cqe) {
mcp_backend_t *be = udata;
- _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.connect_ur);
+ _proxy_evthr_evset_be_conn(be, &be->event_thread->tunables.connect_ur);
}
static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) {
@@ -160,8 +221,10 @@ static void _backend_failed_ur(mcp_backend_t *be) {
P_DEBUG("%s: marking backend as bad\n", __func__);
be->bad = true;
_proxy_evthr_evset_be_retry(be);
+ STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1);
} else {
- _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.retry_ur);
+ _proxy_evthr_evset_be_conn(be, &be->event_thread->tunables.connect_ur);
+ STAT_INCR(be->event_thread->ctx, backend_failed, 1);
}
}
@@ -172,21 +235,16 @@ static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) {
// Error or disconnection.
if (bread <= 0) {
_reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
- // NOTE: Not calling backed_failed here since if the backend is busted
- // it should be caught by the connect routine.
- // This is probably not _always_ true in practice. Leaving this note
- // so I can re-evaluate later.
+ _backend_failed_ur(be);
return;
}
be->rbufused += bread;
int res = proxy_backend_drive_machine(be);
- if (res > 0) {
- _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur);
- return;
- } else if (res == -1) {
- _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
+ if (res != 0) {
+ _reset_bad_backend(be, res);
+ _backend_failed_ur(be);
return;
}
@@ -232,6 +290,157 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
_proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur);
}
+// a backend with an outstanding new connection has become writeable.
+// check validity.
+// TODO: this gets an error if cancelled right? check it?
+static void proxy_backend_beconn_ur(void *udata, struct io_uring_cqe *cqe) {
+ mcp_backend_t *be = udata;
+ int err = 0;
+ assert(be->connecting);
+/* if (_proxy_beconn_checkconnect(be) == -1) {
+ return;
+ } */
+
+ // We were connecting, now ensure we're properly connected.
+ if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) {
+ P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port);
+ // kick the bad backend, clear the queue, retry later.
+ // FIXME (v2): if a connect fails, anything currently in the queue
+ // should be safe to hold up until their timeout.
+ _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
+ _backend_failed_ur(be);
+ return;
+ }
+ P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port);
+ be->connecting = false;
+ be->state = mcp_backend_read;
+ be->bad = false;
+ be->failed_count = 0;
+
+ be->validating = true;
+ // TODO: make validation optional.
+
+ if (_beconn_send_validate(be) == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
+ _backend_failed_ur(be);
+ return;
+ } else {
+ // buffer should be empty during validation stage.
+ assert(be->rbufused == 0);
+ }
+
+ // TODO: make validation optional.
+ // set next handler on recv for validity check.
+ _proxy_evthr_evset_be_readvalidate(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur);
+}
+
+// TODO: share more code with proxy_beconn_handler
+static void proxy_backend_beconn_validate_ur(void *udata, struct io_uring_cqe *cqe) {
+ mcp_backend_t *be = udata;
+ mcmc_resp_t r;
+ assert(be->validating);
+ assert(cqe->res != -EINVAL);
+ P_DEBUG("%s: checking validation: %d\n", __func__, cqe->res);
+
+ int bread = cqe->res;
+ // Error or disconnection.
+ if (bread <= 0) {
+ _reset_bad_backend(be, P_BE_FAIL_DISCONNECTED);
+ _backend_failed_ur(be);
+ return;
+ }
+
+ be->rbufused += bread;
+
+ int status = mcmc_parse_buf(be->client, be->rbuf, be->rbufused, &r);
+ if (status == MCMC_ERR) {
+ // Needed more data for a version line, somehow. For the uring code
+ // we'll treat that as an error, for now.
+ // TODO: re-schedule self if r.code == MCMC_WANT_READ.
+
+ _reset_bad_backend(be, P_BE_FAIL_READVALIDATE);
+ _backend_failed_ur(be);
+ return;
+ }
+
+ if (r.code != MCMC_CODE_VERSION) {
+ _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
+ _backend_failed_ur(be);
+ return;
+ }
+
+ be->validating = false;
+ be->rbufused = 0;
+
+ // Passed validation, don't need to re-read, flush any pending writes.
+ int res = _flush_pending_write(be);
+ if (res == -1) {
+ _reset_bad_backend(be, P_BE_FAIL_WRITING);
+ _backend_failed_ur(be);
+ return;
+ }
+
+ if (!STAILQ_EMPTY(&be->io_head)) {
+ _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur);
+ }
+}
+
+// TODO (v3): much code shared with proxy_event_beconn, should be able to
+// abstract out.
+// TODO (v3): further optimization would move the mcmc_connect() socket
+// creation to uring.
+static void proxy_beconn_handler_ur(void *udata, struct io_uring_cqe *cqe) {
+ proxy_event_thread_t *t = udata;
+ P_DEBUG("%s: got wakeup: %d\n", __func__, cqe->res);
+
+ // liburing always uses eventfd for the notifier.
+ // *cqe has our result.
+ assert(cqe->res != -EINVAL);
+ if (cqe->res != sizeof(eventfd_t)) {
+ P_DEBUG("%s: cqe->res: %d\n", __func__, cqe->res);
+ // FIXME (v2): figure out if this is impossible, and how to handle if not.
+ assert(1 == 0);
+ }
+
+ // need to re-arm the listener every time.
+ _proxy_evthr_evset_benotifier(t);
+
+ beconn_head_t head;
+
+ STAILQ_INIT(&head);
+ pthread_mutex_lock(&t->mutex);
+ STAILQ_CONCAT(&head, &t->beconn_head_in);
+ pthread_mutex_unlock(&t->mutex);
+
+ mcp_backend_t *be = NULL;
+ // be can be freed by the loop, so can't use STAILQ_FOREACH.
+ while (!STAILQ_EMPTY(&head)) {
+ be = STAILQ_FIRST(&head);
+ STAILQ_REMOVE_HEAD(&head, beconn_next);
+ if (be->transferred) {
+ // If this object was already transferred here, we're being
+ // signalled to clean it up and free.
+ _cleanup_backend(be);
+ } else {
+ be->transferred = true;
+ be->event_thread = t;
+ int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
+ if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
+ // if we're already connected for some reason, still push it
+ // through the connection handler to keep the code unified. It
+ // will auto-wake because the socket is writeable.
+ be->connecting = true;
+ be->can_write = false;
+ _proxy_evthr_evset_be_conn(be, &t->tunables.connect_ur);
+ } else {
+ _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
+ _backend_failed_ur(be);
+ }
+ }
+ }
+
+}
+
static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) {
proxy_event_thread_t *t = udata;
@@ -290,6 +499,64 @@ static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) {
}
}
+static void _proxy_evthr_evset_be_readvalidate(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts) {
+ P_DEBUG("%s: setting: %lu\n", __func__, len);
+ struct io_uring_sqe *sqe;
+ if (be->ur_rd_ev.set) {
+ P_DEBUG("%s: already set\n", __func__);
+ return;
+ }
+
+ be->ur_rd_ev.cb = proxy_backend_beconn_validate_ur;
+ be->ur_rd_ev.udata = be;
+
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+ // FIXME (v2): NULL?
+ assert(be->rbuf != NULL);
+ io_uring_prep_recv(sqe, mcmc_fd(be->client), buf, len, 0);
+ io_uring_sqe_set_data(sqe, &be->ur_rd_ev);
+ be->ur_rd_ev.set = true;
+
+ sqe->flags |= IOSQE_IO_LINK;
+
+ // add a timeout.
+ be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
+ be->ur_te_ev.udata = be;
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+
+ io_uring_prep_link_timeout(sqe, ts, 0);
+ io_uring_sqe_set_data(sqe, &be->ur_te_ev);
+}
+
+// reuse the write handler event for pending connections.
+static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timespec *ts) {
+ struct io_uring_sqe *sqe;
+ P_DEBUG("%s: setting\n", __func__);
+ if (be->ur_wr_ev.set)
+ return;
+
+ be->ur_wr_ev.cb = proxy_backend_beconn_ur;
+ be->ur_wr_ev.udata = be;
+
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+ // FIXME (v2): NULL?
+
+ io_uring_prep_poll_add(sqe, mcmc_fd(be->client), POLLOUT);
+ io_uring_sqe_set_data(sqe, &be->ur_wr_ev);
+ be->ur_wr_ev.set = true;
+
+ sqe->flags |= IOSQE_IO_LINK;
+
+ // add a timeout.
+ // FIXME: do I need to change this at all?
+ be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
+ be->ur_te_ev.udata = be;
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+
+ io_uring_prep_link_timeout(sqe, ts, 0);
+ io_uring_sqe_set_data(sqe, &be->ur_te_ev);
+}
+
static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts) {
struct io_uring_sqe *sqe;
if (be->ur_wr_ev.set)
@@ -357,6 +624,21 @@ static void _proxy_evthr_evset_clock(proxy_event_thread_t *t) {
t->ur_clock_event.set = true;
}
+static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t) {
+ struct io_uring_sqe *sqe;
+ P_DEBUG("%s: setting: %d\n", __func__, t->ur_benotify_event.set);
+ if (t->ur_benotify_event.set)
+ return;
+
+ t->ur_benotify_event.cb = proxy_beconn_handler_ur;
+ t->ur_benotify_event.udata = t;
+
+ sqe = io_uring_get_sqe(&t->ring);
+ // FIXME (v2): NULL?
+ io_uring_prep_read(sqe, t->be_event_fd, &t->beevent_counter, sizeof(eventfd_t), 0);
+ io_uring_sqe_set_data(sqe, &t->ur_benotify_event);
+}
+
static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t) {
struct io_uring_sqe *sqe;
P_DEBUG("%s: setting: %d\n", __func__, t->ur_notify_event.set);
@@ -450,6 +732,11 @@ static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) {
}
static void _cleanup_backend(mcp_backend_t *be) {
+#ifdef HAVE_LIBURING
+ if (be->event_thread->use_uring) {
+ // TODO: cancel any live uring events.
+ } else {
+#endif
// remove any pending events.
int pending = 0;
if (event_initialized(&be->event)) {
@@ -458,6 +745,9 @@ static void _cleanup_backend(mcp_backend_t *be) {
if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) {
event_del(&be->event); // an error to call event_del() without event.
}
+#ifdef HAVE_LIBURING
+ }
+#endif
// - assert on empty queue
assert(STAILQ_EMPTY(&be->io_head));
@@ -1058,25 +1348,6 @@ static int _flush_pending_write(mcp_backend_t *be) {
return flags;
}
-static int _beconn_send_validate(mcp_backend_t *be) {
- const char *str = "version\r\n";
- const ssize_t len = strlen(str);
-
- ssize_t res = write(mcmc_fd(be->client), str, len);
-
- if (res == -1) {
- return -1;
- }
-
- // I'm making an opinionated statement that we should be able to write
- // "version\r\n" into a fresh socket without hitting EAGAIN.
- if (res < len) {
- return -1;
- }
-
- return 1;
-}
-
// Libevent handler for backends in a connecting state.
static void proxy_beconn_handler(const int fd, const short which, void *arg) {
assert(arg != NULL);
@@ -1095,35 +1366,10 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) {
be->can_write = true;
if (be->connecting) {
- int err = 0;
- // We were connecting, now ensure we're properly connected.
- if (mcmc_check_nonblock_connect(be->client, &err) != MCMC_OK) {
- // kick the bad backend, clear the queue, retry later.
- // FIXME (v2): if a connect fails, anything currently in the queue
- // should be safe to hold up until their timeout.
- _reset_bad_backend(be, P_BE_FAIL_CONNECTING);
- _backend_failed(be);
- P_DEBUG("%s: backend failed to connect (%s:%s)\n", __func__, be->name, be->port);
+ if (_proxy_beconn_checkconnect(be) == -1) {
return;
}
- P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->name, be->port);
- be->connecting = false;
- be->state = mcp_backend_read;
- be->bad = false;
- be->failed_count = 0;
-
- be->validating = true;
- // TODO: make validation optional.
-
- if (_beconn_send_validate(be) == -1) {
- _reset_bad_backend(be, P_BE_FAIL_BADVALIDATE);
- _backend_failed(be);
- return;
- } else {
- // buffer should be empty during validation stage.
- assert(be->rbufused == 0);
- _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler);
- }
+ _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler);
}
// TODO: currently never taken, until validation is made optional.
@@ -1305,12 +1551,20 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) {
// FIXME (v2): Sigh. we need a blocking event_fd for io_uring but we've a
// chicken and egg in here. need a better structure... in meantime
// re-create the event_fd.
+
+ // set the new request handler.
close(t->event_fd);
t->event_fd = eventfd(0, 0);
// FIXME (v2): hack for event init.
t->ur_notify_event.set = false;
_proxy_evthr_evset_notifier(t);
+ // set the new backend connection handler.
+ close(t->be_event_fd);
+ t->be_event_fd = eventfd(0, 0);
+ t->ur_benotify_event.set = false;
+ _proxy_evthr_evset_benotifier(t);
+
// periodic data updater for event thread
t->ur_clock_event.cb = proxy_event_updater_ur;
t->ur_clock_event.udata = t;