diff options
author | dormando <dormando@rydia.net> | 2022-02-03 14:03:31 -0800 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2022-02-04 13:56:25 -0800 |
commit | 3614f3cf495e1e2b25afde8d4e2e3c0c30bd0dac (patch) | |
tree | 2c3f16dbc115510505df25b736447494459d0e5e /proto_proxy.c | |
parent | 838fda3fdacfd16511c24c5638b8e2de49224c39 (diff) | |
download | memcached-3614f3cf495e1e2b25afde8d4e2e3c0c30bd0dac.tar.gz |
proxy: add mcp.backend_failure_limit(n)
refactors the timeouts management code to be generic "tunables" and adds
the backend limit as a configuration option.
Diffstat (limited to 'proto_proxy.c')
-rw-r--r-- | proto_proxy.c | 91 |
1 files changed, 52 insertions, 39 deletions
diff --git a/proto_proxy.c b/proto_proxy.c index 0a37f90..f8bb504 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -171,7 +171,7 @@ struct proxy_global_stats { uint64_t backend_errors; // errors from backends }; -struct proxy_timeouts { +struct proxy_tunables { struct timeval connect; struct timeval retry; // wait time before retrying a dead backend struct timeval read; @@ -180,6 +180,7 @@ struct proxy_timeouts { struct __kernel_timespec retry_ur; struct __kernel_timespec read_ur; #endif // HAVE_LIBURING + int backend_failure_limit; }; typedef STAILQ_HEAD(pool_head_s, mcp_pool_s) pool_head_t; @@ -201,7 +202,7 @@ typedef struct { bool use_uring; // use IO_URING for backend connections. struct proxy_global_stats global_stats; struct proxy_user_stats user_stats; - struct proxy_timeouts timeouts; // NOTE: updates covered by stats_lock + struct proxy_tunables tunables; // NOTE: updates covered by stats_lock pthread_mutex_t stats_lock; // used for rare global counters } proxy_ctx_t; @@ -334,7 +335,7 @@ struct proxy_event_thread_s { int notify_send_fd; #endif proxy_ctx_t *ctx; // main context. - struct proxy_timeouts timeouts; // periodically copied from main ctx + struct proxy_tunables tunables; // periodically copied from main ctx }; #define RESP_CMD_MAX 8 @@ -767,13 +768,14 @@ void proxy_init(bool use_uring) { pthread_mutex_init(&ctx->stats_lock, NULL); // FIXME (v2): default defines. - ctx->timeouts.connect.tv_sec = 5; - ctx->timeouts.retry.tv_sec = 3; - ctx->timeouts.read.tv_sec = 3; + ctx->tunables.backend_failure_limit = 3; + ctx->tunables.connect.tv_sec = 5; + ctx->tunables.retry.tv_sec = 3; + ctx->tunables.read.tv_sec = 3; #ifdef HAVE_LIBURING - ctx->timeouts.connect_ur.tv_sec = 5; - ctx->timeouts.retry_ur.tv_sec = 3; - ctx->timeouts.read_ur.tv_sec = 3; + ctx->tunables.connect_ur.tv_sec = 5; + ctx->tunables.retry_ur.tv_sec = 3; + ctx->tunables.read_ur.tv_sec = 3; #endif // HAVE_LIBURING STAILQ_INIT(&ctx->manager_head); @@ -816,7 +818,7 @@ void proxy_init(bool use_uring) { pthread_mutex_init(&t->mutex, NULL); pthread_cond_init(&t->cond, NULL); - memcpy(&t->timeouts, &ctx->timeouts, sizeof(t->timeouts)); + memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables)); #ifdef HAVE_LIBURING if (t->use_uring) { @@ -1384,13 +1386,10 @@ static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe) { // we reuse the "global stats" lock since it's hardly ever used. STAT_L(ctx); - memcpy(&t->timeouts, &ctx->timeouts, sizeof(t->timeouts)); + memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables)); STAT_UL(ctx); } -// TODO: move define or move to option. -#define BACKEND_FAILURE_LIMIT 3 - // No-op at the moment. when the linked timeout fires uring returns the // linked request (read/write/poll/etc) with an interrupted/timeout/cancelled // error. So we don't need to explicitly handle timeouts. @@ -1402,7 +1401,7 @@ static void proxy_backend_timeout_handler_ur(void *udata, struct io_uring_cqe *c static void proxy_backend_retry_handler_ur(void *udata, struct io_uring_cqe *cqe) { mcp_backend_t *be = udata; - _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->timeouts.connect_ur); + _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.connect_ur); } static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) { @@ -1416,18 +1415,18 @@ static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) { sqe = io_uring_get_sqe(&be->event_thread->ring); // TODO (v2): NULL? - io_uring_prep_timeout(sqe, &be->event_thread->timeouts.retry_ur, 0, 0); + io_uring_prep_timeout(sqe, &be->event_thread->tunables.retry_ur, 0, 0); io_uring_sqe_set_data(sqe, &be->ur_te_ev); be->ur_te_ev.set = true; } static void _backend_failed_ur(mcp_backend_t *be) { - if (++be->failed_count > BACKEND_FAILURE_LIMIT) { + if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) { P_DEBUG("%s: marking backend as bad\n", __func__); be->bad = true; _proxy_evthr_evset_be_retry(be); } else { - _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->timeouts.retry_ur); + _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.retry_ur); } } @@ -1451,7 +1450,7 @@ static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) { P_DEBUG("%s: bread: %d res: %d toread: %lu\n", __func__, bread, res, toread); if (res > 0) { - _proxy_evthr_evset_be_read(be, rbuf, toread, &be->event_thread->timeouts.read_ur); + _proxy_evthr_evset_be_read(be, rbuf, toread, &be->event_thread->tunables.read_ur); } else if (res == -1) { _reset_bad_backend(be); return; @@ -1459,7 +1458,7 @@ static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) { // TODO (v2): when exactly do we need to reset the backend handler? if (!STAILQ_EMPTY(&be->io_head)) { - _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->timeouts.read_ur); + _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur); } } @@ -1494,10 +1493,10 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) { } if (flags & EV_WRITE) { - _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->timeouts.connect_ur); + _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.connect_ur); } - _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->timeouts.read_ur); + _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur); } static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) { @@ -1549,10 +1548,10 @@ static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) { // FIXME (v2): can't actually set the read here? need to confirm _some_ // write first? if (flags & EV_WRITE) { - _proxy_evthr_evset_be_wrpoll(be, &t->timeouts.connect_ur); + _proxy_evthr_evset_be_wrpoll(be, &t->tunables.connect_ur); } if (flags & EV_READ) { - _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &t->timeouts.read_ur); + _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &t->tunables.read_ur); } } } @@ -1712,7 +1711,7 @@ static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) { // we reuse the "global stats" lock since it's hardly ever used. STAT_L(ctx); - memcpy(&t->timeouts, &ctx->timeouts, sizeof(t->timeouts)); + memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables)); STAT_UL(ctx); } @@ -1749,7 +1748,7 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) { // Re-walk each backend and check set event as required. mcp_backend_t *be = NULL; // FIXME: should this be timeouts.read? - struct timeval tmp_time = t->timeouts.connect; + struct timeval tmp_time = t->tunables.connect; // FIXME (v2): _set_event() is buggy, see notes on function. STAILQ_FOREACH(be, &t->be_head, be_next) { @@ -2229,14 +2228,11 @@ static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf return flags; } -// TODO: surface to option -#define BACKEND_FAILURE_LIMIT 3 - // All we need to do here is schedule the backend to attempt to connect again. static void proxy_backend_retry_handler(const int fd, const short which, void *arg) { mcp_backend_t *be = arg; assert(which & EV_TIMEOUT); - struct timeval tmp_time = be->event_thread->timeouts.retry; + struct timeval tmp_time = be->event_thread->tunables.retry; _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler); } @@ -2254,8 +2250,8 @@ static void proxy_backend_retry_handler(const int fd, const short which, void *a // block them instead. That's more challenging so leaving a note instead // of doing this now :) static void _backend_failed(mcp_backend_t *be) { - struct timeval tmp_time = be->event_thread->timeouts.retry; - if (++be->failed_count > BACKEND_FAILURE_LIMIT) { + struct timeval tmp_time = be->event_thread->tunables.retry; + if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) { P_DEBUG("%s: marking backend as bad\n", __func__); be->bad = true; _set_event(be, be->event_thread->base, EV_TIMEOUT, tmp_time, proxy_backend_retry_handler); @@ -2396,7 +2392,7 @@ static int _flush_pending_write(mcp_backend_t *be) { static void proxy_backend_handler(const int fd, const short which, void *arg) { mcp_backend_t *be = arg; int flags = EV_TIMEOUT; - struct timeval tmp_time = be->event_thread->timeouts.read; + struct timeval tmp_time = be->event_thread->tunables.read; if (which & EV_TIMEOUT) { P_DEBUG("%s: timeout received, killing backend queue\n", __func__); @@ -3061,15 +3057,31 @@ static int mcplib_pool_proxy_call(lua_State *L) { return lua_yield(L, 2); } +static int mcplib_backend_failure_limit(lua_State *L) { + int limit = luaL_checkinteger(L, -1); + proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue. + + if (limit < 0) { + proxy_lua_error(L, "backend_failure_limit must be >= 0"); + return 0; + } + + STAT_L(ctx); + ctx->tunables.backend_failure_limit = limit; + STAT_UL(ctx); + + return 0; +} + // TODO: take fractional time and convert. static int mcplib_backend_connect_timeout(lua_State *L) { int seconds = luaL_checkinteger(L, -1); proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue. STAT_L(ctx); - ctx->timeouts.connect.tv_sec = seconds; + ctx->tunables.connect.tv_sec = seconds; #ifdef HAVE_LIBURING - ctx->timeouts.connect_ur.tv_sec = seconds; + ctx->tunables.connect_ur.tv_sec = seconds; #endif STAT_UL(ctx); @@ -3081,9 +3093,9 @@ static int mcplib_backend_retry_timeout(lua_State *L) { proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue. STAT_L(ctx); - ctx->timeouts.retry.tv_sec = seconds; + ctx->tunables.retry.tv_sec = seconds; #ifdef HAVE_LIBURING - ctx->timeouts.retry_ur.tv_sec = seconds; + ctx->tunables.retry_ur.tv_sec = seconds; #endif STAT_UL(ctx); @@ -3095,9 +3107,9 @@ static int mcplib_backend_read_timeout(lua_State *L) { proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue. STAT_L(ctx); - ctx->timeouts.read.tv_sec = seconds; + ctx->tunables.read.tv_sec = seconds; #ifdef HAVE_LIBURING - ctx->timeouts.read_ur.tv_sec = seconds; + ctx->tunables.read_ur.tv_sec = seconds; #endif STAT_UL(ctx); @@ -4329,6 +4341,7 @@ int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx) { {"backend_connect_timeout", mcplib_backend_connect_timeout}, {"backend_retry_timeout", mcplib_backend_retry_timeout}, {"backend_read_timeout", mcplib_backend_read_timeout}, + {"backend_failure_limit", mcplib_backend_failure_limit}, {NULL, NULL} }; |