summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-01-31 17:20:30 -0800
committerdormando <dormando@rydia.net>2023-02-01 14:28:29 -0800
commit58d8f40a90ee22d78adc0ae99f4b915e65f69be6 (patch)
tree4516a274f0f4a692db97ed235f400c4123534092
parent0d8ebbfb823c94e5ce68550c9ae186d8af789446 (diff)
downloadmemcached-58d8f40a90ee22d78adc0ae99f4b915e65f69be6.tar.gz
proxy: add mcp.backend(t) for more overrides
ie: local b1 = mcp.backend({ label = "b1", host = "127.0.0.1", port = 11511, connecttimeout = 1, retrytimeout = 0.5, readtimeout = 0.1, failurelimit = 11 }) ... to allow for overriding connect/retry/etc tunables on a per-backend basis. If not passed in the global settings are used.
-rw-r--r--proto_proxy.c2
-rw-r--r--proxy.h5
-rw-r--r--proxy_lua.c122
-rw-r--r--proxy_network.c100
-rw-r--r--t/proxyconfig.lua17
-rw-r--r--t/proxyconfig.t38
6 files changed, 186 insertions, 98 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index b09e113..feeddc9 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -175,8 +175,6 @@ void *proxy_init(bool use_uring) {
pthread_mutex_init(&t->mutex, NULL);
pthread_cond_init(&t->cond, NULL);
- memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables));
-
#ifdef HAVE_LIBURING
if (t->use_uring) {
pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t);
diff --git a/proxy.h b/proxy.h
index 6ce6ec8..171f695 100644
--- a/proxy.h
+++ b/proxy.h
@@ -308,6 +308,7 @@ struct mcp_backend_label_s {
char port[MAX_PORTLEN+1];
char label[MAX_LABELLEN+1];
size_t llen; // cache label length for small speedup in pool creation.
+ struct proxy_tunables tunables;
};
// lua object wrapper meant to own a malloc'ed conn structure
@@ -333,6 +334,7 @@ struct mcp_backend_s {
char *rbuf; // statically allocated read buffer.
size_t rbufused; // currently active bytes in the buffer
struct event event; // libevent
+ struct proxy_tunables tunables;
#ifdef HAVE_LIBURING
proxy_event_t ur_rd_ev; // liburing.
proxy_event_t ur_wr_ev; // need a separate event/cb for writing/polling
@@ -357,13 +359,11 @@ struct proxy_event_thread_s {
pthread_t thread_id;
struct event_base *base;
struct event notify_event; // listen event for the notify pipe/eventfd.
- struct event clock_event; // timer for updating event thread data.
struct event beconn_event; // listener for backends in connect state
#ifdef HAVE_LIBURING
struct io_uring ring;
proxy_event_t ur_notify_event; // listen on eventfd.
proxy_event_t ur_benotify_event; // listen on eventfd for backend connections.
- proxy_event_t ur_clock_event; // timer for updating event thread data.
eventfd_t event_counter;
eventfd_t beevent_counter;
bool use_uring;
@@ -383,7 +383,6 @@ struct proxy_event_thread_s {
int be_notify_send_fd;
#endif
proxy_ctx_t *ctx; // main context.
- struct proxy_tunables tunables; // periodically copied from main ctx
};
enum mcp_resp_mode {
diff --git a/proxy_lua.c b/proxy_lua.c
index 14f2564..f309515 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -2,6 +2,10 @@
#include "proxy.h"
+// sad, I had to look this up...
+#define NANOSECONDS(x) ((x) * 1E9 + 0.5)
+#define MICROSECONDS(x) ((x) * 1E6 + 0.5)
+
// func prototype example:
// static int fname (lua_State *L)
// normal library open:
@@ -140,13 +144,112 @@ static int mcplib_backend_gc(lua_State *L) {
// backend label object; given to pools which then find or create backend
// objects as necessary.
+// allow optionally passing a table of arguments for extended options:
+// { label = "etc", "host" = "127.0.0.1", port = "11211",
+// readtimeout = 0.5, connecttimeout = 1, retrytime = 3,
+// failurelimit = 3, tcpkeepalive = false }
static int mcplib_backend(lua_State *L) {
size_t llen = 0;
size_t nlen = 0;
size_t plen = 0;
- const char *label = luaL_checklstring(L, 1, &llen);
- const char *name = luaL_checklstring(L, 2, &nlen);
- const char *port = luaL_checklstring(L, 3, &plen);
+ proxy_ctx_t *ctx = settings.proxy_ctx;
+ mcp_backend_label_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_label_t), 0);
+ memset(be, 0, sizeof(*be));
+ const char *label;
+ const char *name;
+ const char *port;
+ // copy global defaults for tunables.
+ memcpy(&be->tunables, &ctx->tunables, sizeof(be->tunables));
+
+ if (lua_istable(L, 1)) {
+
+ // We don't pop the label/host/port strings so lua won't change them
+ // until after the function call.
+ if (lua_getfield(L, 1, "label") != LUA_TNIL) {
+ label = luaL_checklstring(L, -1, &llen);
+ } else {
+ proxy_lua_error(L, "backend must have a label argument");
+ return 0;
+ }
+
+ if (lua_getfield(L, 1, "host") != LUA_TNIL) {
+ name = luaL_checklstring(L, -1, &nlen);
+ } else {
+ proxy_lua_error(L, "backend must have a host argument");
+ return 0;
+ }
+
+ // TODO: allow a default port.
+ if (lua_getfield(L, 1, "port") != LUA_TNIL) {
+ port = luaL_checklstring(L, -1, &plen);
+ } else {
+ proxy_lua_error(L, "backend must have a port argument");
+ return 0;
+ }
+
+ if (lua_getfield(L, 1, "tcpkeepalive") != LUA_TNIL) {
+ be->tunables.tcp_keepalive = lua_toboolean(L, -1);
+ }
+ lua_pop(L, 1);
+
+ if (lua_getfield(L, 1, "failurelimit") != LUA_TNIL) {
+ int limit = luaL_checkinteger(L, -1);
+ if (limit < 0) {
+ proxy_lua_error(L, "failure_limit must be >= 0");
+ return 0;
+ }
+
+ be->tunables.backend_failure_limit = limit;
+ }
+ lua_pop(L, 1);
+
+ if (lua_getfield(L, 1, "connecttimeout") != LUA_TNIL) {
+ lua_Number secondsf = luaL_checknumber(L, -1);
+ lua_Integer secondsi = (lua_Integer) secondsf;
+ lua_Number subseconds = secondsf - secondsi;
+
+ be->tunables.connect.tv_sec = secondsi;
+ be->tunables.connect.tv_usec = MICROSECONDS(subseconds);
+#ifdef HAVE_LIBURING
+ be->tunables.connect_ur.tv_sec = secondsi;
+ be->tunables.connect_ur.tv_nsec = NANOSECONDS(subseconds);
+#endif
+ }
+ lua_pop(L, 1);
+
+ if (lua_getfield(L, 1, "retrytimeout") != LUA_TNIL) {
+ lua_Number secondsf = luaL_checknumber(L, -1);
+ lua_Integer secondsi = (lua_Integer) secondsf;
+ lua_Number subseconds = secondsf - secondsi;
+
+ be->tunables.retry.tv_sec = secondsi;
+ be->tunables.retry.tv_usec = MICROSECONDS(subseconds);
+#ifdef HAVE_LIBURING
+ be->tunables.retry_ur.tv_sec = secondsi;
+ be->tunables.retry_ur.tv_nsec = NANOSECONDS(subseconds);
+#endif
+ }
+ lua_pop(L, 1);
+
+ if (lua_getfield(L, 1, "readtimeout") != LUA_TNIL) {
+ lua_Number secondsf = luaL_checknumber(L, -1);
+ lua_Integer secondsi = (lua_Integer) secondsf;
+ lua_Number subseconds = secondsf - secondsi;
+
+ be->tunables.read.tv_sec = secondsi;
+ be->tunables.read.tv_usec = MICROSECONDS(subseconds);
+#ifdef HAVE_LIBURING
+ be->tunables.read_ur.tv_sec = secondsi;
+ be->tunables.read_ur.tv_nsec = NANOSECONDS(subseconds);
+#endif
+ }
+ lua_pop(L, 1);
+
+ } else {
+ label = luaL_checklstring(L, 1, &llen);
+ name = luaL_checklstring(L, 2, &nlen);
+ port = luaL_checklstring(L, 3, &plen);
+ }
if (llen > MAX_LABELLEN-1) {
proxy_lua_error(L, "backend label too long");
@@ -163,8 +266,6 @@ static int mcplib_backend(lua_State *L) {
return 0;
}
- mcp_backend_label_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_label_t), 0);
- memset(be, 0, sizeof(*be));
memcpy(be->label, label, llen);
be->label[llen] = '\0';
memcpy(be->name, name, nlen);
@@ -172,6 +273,9 @@ static int mcplib_backend(lua_State *L) {
memcpy(be->port, port, plen);
be->port[plen] = '\0';
be->llen = llen;
+ if (lua_istable(L, 1)) {
+ lua_pop(L, 3); // drop label, name, port.
+ }
luaL_getmetatable(L, "mcp.backend");
lua_setmetatable(L, -2); // set metatable to userdata.
@@ -187,7 +291,8 @@ static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_
if (ret != LUA_TNIL) {
mcp_backend_wrap_t *be_orig = luaL_checkudata(L, -1, "mcp.backendwrap");
if (strncmp(be_orig->be->name, bel->name, MAX_NAMELEN) == 0
- && strncmp(be_orig->be->port, bel->port, MAX_PORTLEN) == 0) {
+ && strncmp(be_orig->be->port, bel->port, MAX_PORTLEN) == 0
+ && memcmp(&be_orig->be->tunables, &bel->tunables, sizeof(bel->tunables)) == 0) {
// backend is the same, return it.
return be_orig;
} else {
@@ -218,6 +323,7 @@ static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_la
strncpy(be->name, bel->name, MAX_NAMELEN+1);
strncpy(be->port, bel->port, MAX_PORTLEN+1);
+ memcpy(&be->tunables, &bel->tunables, sizeof(bel->tunables));
STAILQ_INIT(&be->io_head);
be->state = mcp_backend_read;
@@ -628,10 +734,6 @@ static int mcplib_backend_failure_limit(lua_State *L) {
return 0;
}
-// sad, I had to look this up...
-#define NANOSECONDS(x) ((x) * 1E9 + 0.5)
-#define MICROSECONDS(x) ((x) * 1E6 + 0.5)
-
static int mcplib_backend_connect_timeout(lua_State *L) {
lua_Number secondsf = luaL_checknumber(L, -1);
lua_Integer secondsi = (lua_Integer) secondsf;
diff --git a/proxy_network.c b/proxy_network.c
index d93dd6f..d893f3f 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -40,7 +40,6 @@ static void proxy_backend_handler(const int fd, const short which, void *arg);
static void proxy_beconn_handler(const int fd, const short which, void *arg);
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg);
-static void proxy_event_updater(evutil_socket_t fd, short which, void *arg);
static int _prep_pending_write(mcp_backend_t *be);
static bool _post_pending_write(mcp_backend_t *be, ssize_t sent);
static int _flush_pending_write(mcp_backend_t *be);
@@ -171,10 +170,7 @@ static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timesp
static void _proxy_evthr_evset_be_readvalidate(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts);
static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t);
static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t);
-static void _proxy_evthr_evset_clock(proxy_event_thread_t *t);
-static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe);
static void _backend_failed_ur(mcp_backend_t *be);
-struct __kernel_timespec updater_ts = {.tv_sec = 3, .tv_nsec = 0};
static void _flush_pending_write_ur(mcp_backend_t *be) {
// Allow us to be called with an empty stack to prevent dev errors.
@@ -185,7 +181,7 @@ static void _flush_pending_write_ur(mcp_backend_t *be) {
int iovcnt = _prep_pending_write(be);
// TODO: write timeout.
- _proxy_evthr_evset_be_writev(be, iovcnt, &be->event_thread->tunables.read_ur);
+ _proxy_evthr_evset_be_writev(be, iovcnt, &be->tunables.read_ur);
}
// TODO: we shouldn't handle reads if a write is pending, so postwrite should
@@ -200,7 +196,7 @@ static void proxy_backend_postwrite_ur(void *udata, struct io_uring_cqe *cqe) {
// FIXME: sent == 0 is disconnected? I keep forgetting.
if (sent == -EAGAIN || sent == -EWOULDBLOCK) {
// didn't do any writing, wait for a writeable socket.
- _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.read_ur);
+ _proxy_evthr_evset_be_wrpoll(be, &be->tunables.read_ur);
} else {
_reset_bad_backend(be, P_BE_FAIL_WRITING);
_backend_failed_ur(be);
@@ -209,31 +205,19 @@ static void proxy_backend_postwrite_ur(void *udata, struct io_uring_cqe *cqe) {
if (_post_pending_write(be, sent)) {
// commands were flushed, set read handler.
- _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur);
+ _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur);
}
if (be->io_next) {
// still have unflushed commands, re-run write command.
// writev can't "block if EAGAIN" in io_uring so far as I can tell, so
// we have to switch to polling mode here.
- _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.read_ur);
+ _proxy_evthr_evset_be_wrpoll(be, &be->tunables.read_ur);
}
// TODO: if rbufused != 0, push through drive machine?
}
-static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe) {
- proxy_event_thread_t *t = udata;
- proxy_ctx_t *ctx = t->ctx;
-
- _proxy_evthr_evset_clock(t);
-
- // we reuse the "global stats" lock since it's hardly ever used.
- STAT_L(ctx);
- memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables));
- STAT_UL(ctx);
-}
-
// No-op at the moment. when the linked timeout fires uring returns the
// linked request (read/write/poll/etc) with an interrupted/timeout/cancelled
// error. So we don't need to explicitly handle timeouts.
@@ -245,7 +229,7 @@ static void proxy_backend_timeout_handler_ur(void *udata, struct io_uring_cqe *c
static void proxy_backend_retry_handler_ur(void *udata, struct io_uring_cqe *cqe) {
mcp_backend_t *be = udata;
- _proxy_evthr_evset_be_conn(be, &be->event_thread->tunables.connect_ur);
+ _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur);
}
static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) {
@@ -259,19 +243,19 @@ static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) {
sqe = io_uring_get_sqe(&be->event_thread->ring);
// TODO (v2): NULL?
- io_uring_prep_timeout(sqe, &be->event_thread->tunables.retry_ur, 0, 0);
+ io_uring_prep_timeout(sqe, &be->tunables.retry_ur, 0, 0);
io_uring_sqe_set_data(sqe, &be->ur_te_ev);
be->ur_te_ev.set = true;
}
static void _backend_failed_ur(mcp_backend_t *be) {
- if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) {
+ if (++be->failed_count > be->tunables.backend_failure_limit) {
P_DEBUG("%s: marking backend as bad\n", __func__);
be->bad = true;
_proxy_evthr_evset_be_retry(be);
STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1);
} else {
- _proxy_evthr_evset_be_conn(be, &be->event_thread->tunables.connect_ur);
+ _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur);
STAT_INCR(be->event_thread->ctx, backend_failed, 1);
}
}
@@ -298,7 +282,7 @@ static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) {
// TODO (v2): when exactly do we need to reset the backend handler?
if (!STAILQ_EMPTY(&be->io_head)) {
- _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur);
+ _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur);
}
}
@@ -308,7 +292,7 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
be->can_write = true;
_flush_pending_write_ur(be);
- _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur);
+ _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur);
}
// a backend with an outstanding new connection has become writeable.
@@ -352,7 +336,7 @@ static void proxy_backend_beconn_ur(void *udata, struct io_uring_cqe *cqe) {
// TODO: make validation optional.
// set next handler on recv for validity check.
- _proxy_evthr_evset_be_readvalidate(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur);
+ _proxy_evthr_evset_be_readvalidate(be, be->rbuf, READ_BUFFER_SIZE, &be->tunables.read_ur);
}
// TODO: share more code with proxy_beconn_handler
@@ -443,7 +427,7 @@ static void proxy_beconn_handler_ur(void *udata, struct io_uring_cqe *cqe) {
// will auto-wake because the socket is writeable.
be->connecting = true;
be->can_write = false;
- _proxy_evthr_evset_be_conn(be, &t->tunables.connect_ur);
+ _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur);
} else {
_reset_bad_backend(be, P_BE_FAIL_CONNECTING);
_backend_failed_ur(be);
@@ -654,17 +638,6 @@ static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len,
io_uring_sqe_set_data(sqe, NULL);
}*/
-static void _proxy_evthr_evset_clock(proxy_event_thread_t *t) {
- struct io_uring_sqe *sqe;
-
- sqe = io_uring_get_sqe(&t->ring);
- // FIXME (v2): NULL?
-
- io_uring_prep_timeout(sqe, &updater_ts, 0, 0);
- io_uring_sqe_set_data(sqe, &t->ur_clock_event);
- t->ur_clock_event.set = true;
-}
-
static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t) {
struct io_uring_sqe *sqe;
P_DEBUG("%s: setting: %d\n", __func__, t->ur_benotify_event.set);
@@ -745,28 +718,6 @@ void *proxy_event_thread_ur(void *arg) {
}
#endif // HAVE_LIBURING
-// We need to get timeout/retry/etc updates to the event thread(s)
-// occasionally. I'd like to have a better inteface around this where updates
-// are shipped directly; but this is good enough to start with.
-static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) {
- proxy_event_thread_t *t = arg;
- proxy_ctx_t *ctx = t->ctx;
-
- // TODO (v2): double check how much of this boilerplate is still necessary?
- // reschedule the clock event.
- evtimer_del(&t->clock_event);
-
- evtimer_set(&t->clock_event, proxy_event_updater, t);
- event_base_set(t->base, &t->clock_event);
- struct timeval rate = {.tv_sec = 3, .tv_usec = 0};
- evtimer_add(&t->clock_event, &rate);
-
- // we reuse the "global stats" lock since it's hardly ever used.
- STAT_L(ctx);
- memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables));
- STAT_UL(ctx);
-}
-
static void _cleanup_backend(mcp_backend_t *be) {
#ifdef HAVE_LIBURING
if (be->event_thread->use_uring) {
@@ -817,7 +768,6 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
#endif
beconn_head_t head;
- struct timeval tmp_time = t->tunables.connect;
STAILQ_INIT(&head);
pthread_mutex_lock(&t->mutex);
@@ -850,7 +800,7 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
// will auto-wake because the socket is writeable.
be->connecting = true;
be->can_write = false;
- _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
+ _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, be->tunables.connect, proxy_beconn_handler);
} else {
_reset_bad_backend(be, P_BE_FAIL_CONNECTING);
_backend_failed(be);
@@ -891,7 +841,6 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
// Re-walk each backend and check set event as required.
mcp_backend_t *be = NULL;
- struct timeval tmp_time = t->tunables.read;
// FIXME (v2): _set_event() is buggy, see notes on function.
STAILQ_FOREACH(be, &t->be_head, be_next) {
@@ -908,7 +857,7 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
_backend_failed(be);
} else {
flags = be->can_write ? EV_READ|EV_TIMEOUT : EV_READ|EV_WRITE|EV_TIMEOUT;
- _set_event(be, t->base, flags, tmp_time, proxy_backend_handler);
+ _set_event(be, t->base, flags, be->tunables.read, proxy_backend_handler);
}
}
}
@@ -1205,7 +1154,7 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
static void proxy_backend_retry_handler(const int fd, const short which, void *arg) {
mcp_backend_t *be = arg;
assert(which & EV_TIMEOUT);
- struct timeval tmp_time = be->event_thread->tunables.retry;
+ struct timeval tmp_time = be->tunables.retry;
_set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
}
@@ -1223,8 +1172,8 @@ static void proxy_backend_retry_handler(const int fd, const short which, void *a
// block them instead. That's more challenging so leaving a note instead
// of doing this now :)
static void _backend_failed(mcp_backend_t *be) {
- struct timeval tmp_time = be->event_thread->tunables.retry;
- if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) {
+ struct timeval tmp_time = be->tunables.retry;
+ if (++be->failed_count > be->tunables.backend_failure_limit) {
P_DEBUG("%s: marking backend as bad\n", __func__);
be->bad = true;
_set_event(be, be->event_thread->base, EV_TIMEOUT, tmp_time, proxy_backend_retry_handler);
@@ -1404,7 +1353,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) {
assert(arg != NULL);
mcp_backend_t *be = arg;
int flags = EV_TIMEOUT;
- struct timeval tmp_time = be->event_thread->tunables.read;
+ struct timeval tmp_time = be->tunables.read;
if (which & EV_TIMEOUT) {
P_DEBUG("%s: backend timed out while connecting\n", __func__);
@@ -1504,7 +1453,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) {
static void proxy_backend_handler(const int fd, const short which, void *arg) {
mcp_backend_t *be = arg;
int flags = EV_TIMEOUT;
- struct timeval tmp_time = be->event_thread->tunables.read;
+ struct timeval tmp_time = be->tunables.read;
if (which & EV_TIMEOUT) {
P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
@@ -1616,12 +1565,6 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) {
t->ur_benotify_event.set = false;
_proxy_evthr_evset_benotifier(t);
- // periodic data updater for event thread
- t->ur_clock_event.cb = proxy_event_updater_ur;
- t->ur_clock_event.udata = t;
- t->ur_clock_event.set = false;
- _proxy_evthr_evset_clock(t);
-
t->use_uring = true;
return;
} else {
@@ -1659,11 +1602,6 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) {
EV_READ | EV_PERSIST, proxy_event_beconn, t);
#endif
- evtimer_set(&t->clock_event, proxy_event_updater, t);
- event_base_set(t->base, &t->clock_event);
- struct timeval rate = {.tv_sec = 3, .tv_usec = 0};
- evtimer_add(&t->clock_event, &rate);
-
event_base_set(t->base, &t->notify_event);
if (event_add(&t->notify_event, 0) == -1) {
fprintf(stderr, "Can't monitor libevent notify pipe\n");
diff --git a/t/proxyconfig.lua b/t/proxyconfig.lua
index 18a35de..5f9ad0e 100644
--- a/t/proxyconfig.lua
+++ b/t/proxyconfig.lua
@@ -2,6 +2,8 @@
-- so we can modify ourselves.
local mode = dofile("/tmp/proxyconfigmode.lua")
+mcp.backend_read_timeout(4)
+
function mcp_config_pools(old)
if mode == "none" then
return {}
@@ -14,6 +16,19 @@ function mcp_config_pools(old)
test = mcp.pool({b1, b2, b3})
}
return pools
+ elseif mode == "betable" then
+ local b1 = mcp.backend({ label = "b1", host = "127.0.0.1", port = 11511,
+ connecttimeout = 2, retrytimeout = 5, readtimeout = 0.1,
+ failurelimit = 0 })
+ local b2 = mcp.backend({ label = "b2", host = "127.0.0.1", port = 11512,
+ connecttimeout = 2, retrytimeout = 5, readtimeout = 5 })
+ local b3 = mcp.backend({ label = "b3", host = "127.0.0.1", port = 11513,
+ connecttimeout = 5, retrytimeout = 5, readtimeout = 5 })
+
+ local pools = {
+ test = mcp.pool({b1, b2, b3})
+ }
+ return pools
end
end
@@ -24,7 +39,7 @@ function mcp_config_routes(zones)
if mode == "none" then
mcp.attach(mcp.CMD_MG, function(r) return "SERVER_ERROR no mg route\r\n" end)
mcp.attach(mcp.CMD_MS, function(r) return "SERVER_ERROR no ms route\r\n" end)
- elseif mode == "start" then
+ elseif mode == "start" or mode == "betable" then
mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end)
mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end)
end
diff --git a/t/proxyconfig.t b/t/proxyconfig.t
index 2380997..9561415 100644
--- a/t/proxyconfig.t
+++ b/t/proxyconfig.t
@@ -119,7 +119,7 @@ is(<$watcher>, "OK\r\n", "watcher enabled");
# Try sending something.
my $cmd = "mg foo v\r\n";
print $ps $cmd;
- my @readable = $s->can_read(1);
+ my @readable = $s->can_read(0.25);
is(scalar @readable, 1, "only one backend became readable");
my $be = shift @readable;
is(scalar <$be>, $cmd, "metaget passthrough");
@@ -127,6 +127,42 @@ is(<$watcher>, "OK\r\n", "watcher enabled");
is(scalar <$ps>, "EN\r\n", "miss received");
}
+# Test backend table arguments and per-backend time overrides
+{
+ # This should create three new backend sockets
+ write_modefile('return "betable"');
+ $p_srv->reload();
+ wait_reload($watcher);
+
+ # sleep a short time; b1 should have a very short timeout and the
+ # others are long.
+ select(undef, undef, undef, 0.5);
+
+ my $s = IO::Select->new();
+ for my $msrv (@mocksrvs) {
+ $s->add($msrv);
+ }
+ my @readable = $s->can_read(0.25);
+ # All three backends should have changed despite having the same label,
+ # host, and port arguments.
+ is(scalar @readable, 3, "all listeners became readable");
+
+ like(<$watcher>, qr/ts=(\S+) gid=\d+ type=proxy_backend error=timeout name=\S+ port=11511/, "one backend timed out connecting");
+
+ for my $msrv (@readable) {
+ my $be = $msrv->accept();
+ ok(defined $be, "mock backend accepted");
+ like(<$be>, qr/version/, "received version command");
+ print $be "VERSION 1.0.0-mock\r\n";
+ }
+
+ # reload again and ensure only the bad socket became available.
+ $p_srv->reload();
+ wait_reload($watcher);
+ @readable = $s->can_read(0.5);
+ is(scalar @readable, 1, "only one listener became readable");
+}
+
# TODO:
# remove backends
# do dead sockets close?