summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2021-12-04 16:09:31 -0800
committerdormando <dormando@rydia.net>2022-01-12 14:07:22 -0800
commit5a04d0fba2b05dfb7002c5ccc1d6afcebf7315cb (patch)
tree53ffac129dfb219b1c64b1c0dc954351c1ff29dc
parent305b7864e6a0bc9130d1cc6dd38feeefae8e4a79 (diff)
downloadmemcached-5a04d0fba2b05dfb7002c5ccc1d6afcebf7315cb.tar.gz
proxy: bad backend retry code for uring
-rw-r--r--proto_proxy.c70
1 files changed, 60 insertions, 10 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index 9de1e9b..6d9c2f3 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -283,6 +283,7 @@ struct mcp_backend_s {
#ifdef HAVE_LIBURING
proxy_event_t ur_rd_ev; // liburing.
proxy_event_t ur_wr_ev; // need a separate event/cb for writing/polling
+ proxy_event_t ur_te_ev; // for timeout handling
#endif
enum mcp_backend_states state; // readback state machine
bool connecting; // in the process of an asynch connection.
@@ -1289,6 +1290,50 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
#ifdef HAVE_LIBURING
static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts);
static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts);
+static void _proxy_evthr_evset_be_retry(mcp_backend_t *be);
+
+// TODO: move define or move to option.
+#define BACKEND_FAILURE_LIMIT 3
+
+// No-op at the moment. when the linked timeout fires uring returns the
+// linked request (read/write/poll/etc) with an interrupted/timeout/cancelled
+// error. So we don't need to explicitly handle timeouts.
+// I'm leaving the structure in to simplify the callback routine.
+// Since timeouts rarely get called the extra code here shouldn't matter.
+static void proxy_backend_timeout_handler_ur(void *udata, struct io_uring_cqe *cqe) {
+ return;
+}
+
+static void proxy_backend_retry_handler_ur(void *udata, struct io_uring_cqe *cqe) {
+ mcp_backend_t *be = udata;
+ _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->timeouts.connect_ur);
+}
+
+static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) {
+ struct io_uring_sqe *sqe;
+ if (be->ur_te_ev.set)
+ return;
+
+ be->ur_te_ev.cb = proxy_backend_retry_handler_ur;
+ be->ur_te_ev.udata = be;
+
+ sqe = io_uring_get_sqe(&be->event_thread->ring);
+ // FIXME: NULL?
+
+ io_uring_prep_timeout(sqe, &be->event_thread->timeouts.retry_ur, 0, 0);
+ io_uring_sqe_set_data(sqe, &be->ur_te_ev);
+ be->ur_te_ev.set = true;
+}
+
+static void _backend_failed_ur(mcp_backend_t *be) {
+ if (++be->failed_count > BACKEND_FAILURE_LIMIT) {
+ P_DEBUG("%s: marking backend as bad\n", __func__);
+ be->bad = true;
+ _proxy_evthr_evset_be_retry(be);
+ } else {
+ _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->timeouts.retry_ur);
+ }
+}
// read handler.
static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) {
@@ -1299,6 +1344,10 @@ static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) {
// Error or disconnection.
if (bread <= 0) {
_reset_bad_backend(be);
+ // NOTE: Not calling backed_failed here since if the backend is busted
+ // it should be caught by the connect routine.
+ // This is probably not _always_ true in practice. Leaving this note
+ // so I can re-evaluate later.
return;
}
@@ -1331,7 +1380,7 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
// FIXME: if a connect fails, anything currently in the queue
// should be safe to hold up until their timeout.
_reset_bad_backend(be);
- //_backend_failed(be); // FIXME: need a uring version of this.
+ _backend_failed_ur(be);
P_DEBUG("%s: backend failed to connect\n", __func__);
return;
}
@@ -1356,6 +1405,7 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
}
if (res == -1) {
_reset_bad_backend(be);
+ // FIXME: backend_failed?
return;
}
@@ -1447,12 +1497,13 @@ static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_time
sqe->flags |= IOSQE_IO_LINK;
- // TODO: special timeout callback that we ignore?
// add a timeout.
+ be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
+ be->ur_te_ev.udata = be;
sqe = io_uring_get_sqe(&be->event_thread->ring);
io_uring_prep_link_timeout(sqe, ts, 0);
- io_uring_sqe_set_data(sqe, 0);
+ io_uring_sqe_set_data(sqe, &be->ur_te_ev);
}
static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts) {
@@ -1476,10 +1527,13 @@ static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len,
sqe->flags |= IOSQE_IO_LINK;
// add a timeout.
+ // TODO: we can pre-set the event data and avoid always re-doing it here.
+ be->ur_te_ev.cb = proxy_backend_timeout_handler_ur;
+ be->ur_te_ev.udata = be;
sqe = io_uring_get_sqe(&be->event_thread->ring);
io_uring_prep_link_timeout(sqe, ts, 0);
- io_uring_sqe_set_data(sqe, 0);
+ io_uring_sqe_set_data(sqe, &be->ur_te_ev);
}
static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t) {
@@ -1537,12 +1591,8 @@ static void *proxy_event_thread_ur(void *arg) {
P_DEBUG("%s: got a CQE [count:%d]\n", __func__, count);
proxy_event_t *pe = io_uring_cqe_get_data(cqe);
- if (pe) {
- pe->set = false;
- pe->cb(pe->udata, cqe);
- } else {
- P_DEBUG("%s: probably got a CQE for a timeout? [%d]\n", __func__, cqe->res);
- }
+ pe->set = false;
+ pe->cb(pe->udata, cqe);
count++;
}