diff options
author | dormando <dormando@rydia.net> | 2021-12-04 16:09:31 -0800 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2022-01-12 14:07:22 -0800 |
commit | 5a04d0fba2b05dfb7002c5ccc1d6afcebf7315cb (patch) | |
tree | 53ffac129dfb219b1c64b1c0dc954351c1ff29dc | |
parent | 305b7864e6a0bc9130d1cc6dd38feeefae8e4a79 (diff) | |
download | memcached-5a04d0fba2b05dfb7002c5ccc1d6afcebf7315cb.tar.gz |
proxy: bad backend retry code for uring
-rw-r--r-- | proto_proxy.c | 70 |
1 files changed, 60 insertions, 10 deletions
diff --git a/proto_proxy.c b/proto_proxy.c index 9de1e9b..6d9c2f3 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -283,6 +283,7 @@ struct mcp_backend_s { #ifdef HAVE_LIBURING proxy_event_t ur_rd_ev; // liburing. proxy_event_t ur_wr_ev; // need a separate event/cb for writing/polling + proxy_event_t ur_te_ev; // for timeout handling #endif enum mcp_backend_states state; // readback state machine bool connecting; // in the process of an asynch connection. @@ -1289,6 +1290,50 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) { #ifdef HAVE_LIBURING static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts); static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_timespec *ts); +static void _proxy_evthr_evset_be_retry(mcp_backend_t *be); + +// TODO: move define or move to option. +#define BACKEND_FAILURE_LIMIT 3 + +// No-op at the moment. when the linked timeout fires uring returns the +// linked request (read/write/poll/etc) with an interrupted/timeout/cancelled +// error. So we don't need to explicitly handle timeouts. +// I'm leaving the structure in to simplify the callback routine. +// Since timeouts rarely get called the extra code here shouldn't matter. +static void proxy_backend_timeout_handler_ur(void *udata, struct io_uring_cqe *cqe) { + return; +} + +static void proxy_backend_retry_handler_ur(void *udata, struct io_uring_cqe *cqe) { + mcp_backend_t *be = udata; + _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->timeouts.connect_ur); +} + +static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) { + struct io_uring_sqe *sqe; + if (be->ur_te_ev.set) + return; + + be->ur_te_ev.cb = proxy_backend_retry_handler_ur; + be->ur_te_ev.udata = be; + + sqe = io_uring_get_sqe(&be->event_thread->ring); + // FIXME: NULL? + + io_uring_prep_timeout(sqe, &be->event_thread->timeouts.retry_ur, 0, 0); + io_uring_sqe_set_data(sqe, &be->ur_te_ev); + be->ur_te_ev.set = true; +} + +static void _backend_failed_ur(mcp_backend_t *be) { + if (++be->failed_count > BACKEND_FAILURE_LIMIT) { + P_DEBUG("%s: marking backend as bad\n", __func__); + be->bad = true; + _proxy_evthr_evset_be_retry(be); + } else { + _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->timeouts.retry_ur); + } +} // read handler. static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) { @@ -1299,6 +1344,10 @@ static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) { // Error or disconnection. if (bread <= 0) { _reset_bad_backend(be); + // NOTE: Not calling backed_failed here since if the backend is busted + // it should be caught by the connect routine. + // This is probably not _always_ true in practice. Leaving this note + // so I can re-evaluate later. return; } @@ -1331,7 +1380,7 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) { // FIXME: if a connect fails, anything currently in the queue // should be safe to hold up until their timeout. _reset_bad_backend(be); - //_backend_failed(be); // FIXME: need a uring version of this. + _backend_failed_ur(be); P_DEBUG("%s: backend failed to connect\n", __func__); return; } @@ -1356,6 +1405,7 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) { } if (res == -1) { _reset_bad_backend(be); + // FIXME: backend_failed? return; } @@ -1447,12 +1497,13 @@ static void _proxy_evthr_evset_be_wrpoll(mcp_backend_t *be, struct __kernel_time sqe->flags |= IOSQE_IO_LINK; - // TODO: special timeout callback that we ignore? // add a timeout. + be->ur_te_ev.cb = proxy_backend_timeout_handler_ur; + be->ur_te_ev.udata = be; sqe = io_uring_get_sqe(&be->event_thread->ring); io_uring_prep_link_timeout(sqe, ts, 0); - io_uring_sqe_set_data(sqe, 0); + io_uring_sqe_set_data(sqe, &be->ur_te_ev); } static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts) { @@ -1476,10 +1527,13 @@ static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, sqe->flags |= IOSQE_IO_LINK; // add a timeout. + // TODO: we can pre-set the event data and avoid always re-doing it here. + be->ur_te_ev.cb = proxy_backend_timeout_handler_ur; + be->ur_te_ev.udata = be; sqe = io_uring_get_sqe(&be->event_thread->ring); io_uring_prep_link_timeout(sqe, ts, 0); - io_uring_sqe_set_data(sqe, 0); + io_uring_sqe_set_data(sqe, &be->ur_te_ev); } static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t) { @@ -1537,12 +1591,8 @@ static void *proxy_event_thread_ur(void *arg) { P_DEBUG("%s: got a CQE [count:%d]\n", __func__, count); proxy_event_t *pe = io_uring_cqe_get_data(cqe); - if (pe) { - pe->set = false; - pe->cb(pe->udata, cqe); - } else { - P_DEBUG("%s: probably got a CQE for a timeout? [%d]\n", __func__, cqe->res); - } + pe->set = false; + pe->cb(pe->udata, cqe); count++; } |