summaryrefslogtreecommitdiff
path: root/proto_proxy.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-02-03 14:03:31 -0800
committerdormando <dormando@rydia.net>2022-02-04 13:56:25 -0800
commit3614f3cf495e1e2b25afde8d4e2e3c0c30bd0dac (patch)
tree2c3f16dbc115510505df25b736447494459d0e5e /proto_proxy.c
parent838fda3fdacfd16511c24c5638b8e2de49224c39 (diff)
downloadmemcached-3614f3cf495e1e2b25afde8d4e2e3c0c30bd0dac.tar.gz
proxy: add mcp.backend_failure_limit(n)
refactors the timeouts management code to be generic "tunables" and adds the backend limit as a configuration option.
Diffstat (limited to 'proto_proxy.c')
-rw-r--r--proto_proxy.c91
1 files changed, 52 insertions, 39 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index 0a37f90..f8bb504 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -171,7 +171,7 @@ struct proxy_global_stats {
uint64_t backend_errors; // errors from backends
};
-struct proxy_timeouts {
+struct proxy_tunables {
struct timeval connect;
struct timeval retry; // wait time before retrying a dead backend
struct timeval read;
@@ -180,6 +180,7 @@ struct proxy_timeouts {
struct __kernel_timespec retry_ur;
struct __kernel_timespec read_ur;
#endif // HAVE_LIBURING
+ int backend_failure_limit;
};
typedef STAILQ_HEAD(pool_head_s, mcp_pool_s) pool_head_t;
@@ -201,7 +202,7 @@ typedef struct {
bool use_uring; // use IO_URING for backend connections.
struct proxy_global_stats global_stats;
struct proxy_user_stats user_stats;
- struct proxy_timeouts timeouts; // NOTE: updates covered by stats_lock
+ struct proxy_tunables tunables; // NOTE: updates covered by stats_lock
pthread_mutex_t stats_lock; // used for rare global counters
} proxy_ctx_t;
@@ -334,7 +335,7 @@ struct proxy_event_thread_s {
int notify_send_fd;
#endif
proxy_ctx_t *ctx; // main context.
- struct proxy_timeouts timeouts; // periodically copied from main ctx
+ struct proxy_tunables tunables; // periodically copied from main ctx
};
#define RESP_CMD_MAX 8
@@ -767,13 +768,14 @@ void proxy_init(bool use_uring) {
pthread_mutex_init(&ctx->stats_lock, NULL);
// FIXME (v2): default defines.
- ctx->timeouts.connect.tv_sec = 5;
- ctx->timeouts.retry.tv_sec = 3;
- ctx->timeouts.read.tv_sec = 3;
+ ctx->tunables.backend_failure_limit = 3;
+ ctx->tunables.connect.tv_sec = 5;
+ ctx->tunables.retry.tv_sec = 3;
+ ctx->tunables.read.tv_sec = 3;
#ifdef HAVE_LIBURING
- ctx->timeouts.connect_ur.tv_sec = 5;
- ctx->timeouts.retry_ur.tv_sec = 3;
- ctx->timeouts.read_ur.tv_sec = 3;
+ ctx->tunables.connect_ur.tv_sec = 5;
+ ctx->tunables.retry_ur.tv_sec = 3;
+ ctx->tunables.read_ur.tv_sec = 3;
#endif // HAVE_LIBURING
STAILQ_INIT(&ctx->manager_head);
@@ -816,7 +818,7 @@ void proxy_init(bool use_uring) {
pthread_mutex_init(&t->mutex, NULL);
pthread_cond_init(&t->cond, NULL);
- memcpy(&t->timeouts, &ctx->timeouts, sizeof(t->timeouts));
+ memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables));
#ifdef HAVE_LIBURING
if (t->use_uring) {
@@ -1384,13 +1386,10 @@ static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe) {
// we reuse the "global stats" lock since it's hardly ever used.
STAT_L(ctx);
- memcpy(&t->timeouts, &ctx->timeouts, sizeof(t->timeouts));
+ memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables));
STAT_UL(ctx);
}
-// TODO: move define or move to option.
-#define BACKEND_FAILURE_LIMIT 3
-
// No-op at the moment. when the linked timeout fires uring returns the
// linked request (read/write/poll/etc) with an interrupted/timeout/cancelled
// error. So we don't need to explicitly handle timeouts.
@@ -1402,7 +1401,7 @@ static void proxy_backend_timeout_handler_ur(void *udata, struct io_uring_cqe *c
static void proxy_backend_retry_handler_ur(void *udata, struct io_uring_cqe *cqe) {
mcp_backend_t *be = udata;
- _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->timeouts.connect_ur);
+ _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.connect_ur);
}
static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) {
@@ -1416,18 +1415,18 @@ static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) {
sqe = io_uring_get_sqe(&be->event_thread->ring);
// TODO (v2): NULL?
- io_uring_prep_timeout(sqe, &be->event_thread->timeouts.retry_ur, 0, 0);
+ io_uring_prep_timeout(sqe, &be->event_thread->tunables.retry_ur, 0, 0);
io_uring_sqe_set_data(sqe, &be->ur_te_ev);
be->ur_te_ev.set = true;
}
static void _backend_failed_ur(mcp_backend_t *be) {
- if (++be->failed_count > BACKEND_FAILURE_LIMIT) {
+ if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) {
P_DEBUG("%s: marking backend as bad\n", __func__);
be->bad = true;
_proxy_evthr_evset_be_retry(be);
} else {
- _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->timeouts.retry_ur);
+ _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.retry_ur);
}
}
@@ -1451,7 +1450,7 @@ static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) {
P_DEBUG("%s: bread: %d res: %d toread: %lu\n", __func__, bread, res, toread);
if (res > 0) {
- _proxy_evthr_evset_be_read(be, rbuf, toread, &be->event_thread->timeouts.read_ur);
+ _proxy_evthr_evset_be_read(be, rbuf, toread, &be->event_thread->tunables.read_ur);
} else if (res == -1) {
_reset_bad_backend(be);
return;
@@ -1459,7 +1458,7 @@ static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) {
// TODO (v2): when exactly do we need to reset the backend handler?
if (!STAILQ_EMPTY(&be->io_head)) {
- _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->timeouts.read_ur);
+ _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur);
}
}
@@ -1494,10 +1493,10 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
}
if (flags & EV_WRITE) {
- _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->timeouts.connect_ur);
+ _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.connect_ur);
}
- _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->timeouts.read_ur);
+ _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur);
}
static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) {
@@ -1549,10 +1548,10 @@ static void proxy_event_handler_ur(void *udata, struct io_uring_cqe *cqe) {
// FIXME (v2): can't actually set the read here? need to confirm _some_
// write first?
if (flags & EV_WRITE) {
- _proxy_evthr_evset_be_wrpoll(be, &t->timeouts.connect_ur);
+ _proxy_evthr_evset_be_wrpoll(be, &t->tunables.connect_ur);
}
if (flags & EV_READ) {
- _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &t->timeouts.read_ur);
+ _proxy_evthr_evset_be_read(be, be->rbuf, READ_BUFFER_SIZE, &t->tunables.read_ur);
}
}
}
@@ -1712,7 +1711,7 @@ static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) {
// we reuse the "global stats" lock since it's hardly ever used.
STAT_L(ctx);
- memcpy(&t->timeouts, &ctx->timeouts, sizeof(t->timeouts));
+ memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables));
STAT_UL(ctx);
}
@@ -1749,7 +1748,7 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
// Re-walk each backend and check set event as required.
mcp_backend_t *be = NULL;
// FIXME: should this be timeouts.read?
- struct timeval tmp_time = t->timeouts.connect;
+ struct timeval tmp_time = t->tunables.connect;
// FIXME (v2): _set_event() is buggy, see notes on function.
STAILQ_FOREACH(be, &t->be_head, be_next) {
@@ -2229,14 +2228,11 @@ static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf
return flags;
}
-// TODO: surface to option
-#define BACKEND_FAILURE_LIMIT 3
-
// All we need to do here is schedule the backend to attempt to connect again.
static void proxy_backend_retry_handler(const int fd, const short which, void *arg) {
mcp_backend_t *be = arg;
assert(which & EV_TIMEOUT);
- struct timeval tmp_time = be->event_thread->timeouts.retry;
+ struct timeval tmp_time = be->event_thread->tunables.retry;
_set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_backend_handler);
}
@@ -2254,8 +2250,8 @@ static void proxy_backend_retry_handler(const int fd, const short which, void *a
// block them instead. That's more challenging so leaving a note instead
// of doing this now :)
static void _backend_failed(mcp_backend_t *be) {
- struct timeval tmp_time = be->event_thread->timeouts.retry;
- if (++be->failed_count > BACKEND_FAILURE_LIMIT) {
+ struct timeval tmp_time = be->event_thread->tunables.retry;
+ if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) {
P_DEBUG("%s: marking backend as bad\n", __func__);
be->bad = true;
_set_event(be, be->event_thread->base, EV_TIMEOUT, tmp_time, proxy_backend_retry_handler);
@@ -2396,7 +2392,7 @@ static int _flush_pending_write(mcp_backend_t *be) {
static void proxy_backend_handler(const int fd, const short which, void *arg) {
mcp_backend_t *be = arg;
int flags = EV_TIMEOUT;
- struct timeval tmp_time = be->event_thread->timeouts.read;
+ struct timeval tmp_time = be->event_thread->tunables.read;
if (which & EV_TIMEOUT) {
P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
@@ -3061,15 +3057,31 @@ static int mcplib_pool_proxy_call(lua_State *L) {
return lua_yield(L, 2);
}
+static int mcplib_backend_failure_limit(lua_State *L) {
+ int limit = luaL_checkinteger(L, -1);
+ proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue.
+
+ if (limit < 0) {
+ proxy_lua_error(L, "backend_failure_limit must be >= 0");
+ return 0;
+ }
+
+ STAT_L(ctx);
+ ctx->tunables.backend_failure_limit = limit;
+ STAT_UL(ctx);
+
+ return 0;
+}
+
// TODO: take fractional time and convert.
static int mcplib_backend_connect_timeout(lua_State *L) {
int seconds = luaL_checkinteger(L, -1);
proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue.
STAT_L(ctx);
- ctx->timeouts.connect.tv_sec = seconds;
+ ctx->tunables.connect.tv_sec = seconds;
#ifdef HAVE_LIBURING
- ctx->timeouts.connect_ur.tv_sec = seconds;
+ ctx->tunables.connect_ur.tv_sec = seconds;
#endif
STAT_UL(ctx);
@@ -3081,9 +3093,9 @@ static int mcplib_backend_retry_timeout(lua_State *L) {
proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue.
STAT_L(ctx);
- ctx->timeouts.retry.tv_sec = seconds;
+ ctx->tunables.retry.tv_sec = seconds;
#ifdef HAVE_LIBURING
- ctx->timeouts.retry_ur.tv_sec = seconds;
+ ctx->tunables.retry_ur.tv_sec = seconds;
#endif
STAT_UL(ctx);
@@ -3095,9 +3107,9 @@ static int mcplib_backend_read_timeout(lua_State *L) {
proxy_ctx_t *ctx = settings.proxy_ctx; // FIXME (v2): get global ctx reference in thread/upvalue.
STAT_L(ctx);
- ctx->timeouts.read.tv_sec = seconds;
+ ctx->tunables.read.tv_sec = seconds;
#ifdef HAVE_LIBURING
- ctx->timeouts.read_ur.tv_sec = seconds;
+ ctx->tunables.read_ur.tv_sec = seconds;
#endif
STAT_UL(ctx);
@@ -4329,6 +4341,7 @@ int proxy_register_libs(LIBEVENT_THREAD *t, void *ctx) {
{"backend_connect_timeout", mcplib_backend_connect_timeout},
{"backend_retry_timeout", mcplib_backend_retry_timeout},
{"backend_read_timeout", mcplib_backend_read_timeout},
+ {"backend_failure_limit", mcplib_backend_failure_limit},
{NULL, NULL}
};