From 58d8f40a90ee22d78adc0ae99f4b915e65f69be6 Mon Sep 17 00:00:00 2001 From: dormando Date: Tue, 31 Jan 2023 17:20:30 -0800 Subject: proxy: add mcp.backend(t) for more overrides ie: local b1 = mcp.backend({ label = "b1", host = "127.0.0.1", port = 11511, connecttimeout = 1, retrytimeout = 0.5, readtimeout = 0.1, failurelimit = 11 }) ... to allow for overriding connect/retry/etc tunables on a per-backend basis. If not passed in the global settings are used. --- proto_proxy.c | 2 - proxy.h | 5 +-- proxy_lua.c | 122 +++++++++++++++++++++++++++++++++++++++++++++++++----- proxy_network.c | 100 +++++++++----------------------------------- t/proxyconfig.lua | 17 +++++++- t/proxyconfig.t | 38 ++++++++++++++++- 6 files changed, 186 insertions(+), 98 deletions(-) diff --git a/proto_proxy.c b/proto_proxy.c index b09e113..feeddc9 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -175,8 +175,6 @@ void *proxy_init(bool use_uring) { pthread_mutex_init(&t->mutex, NULL); pthread_cond_init(&t->cond, NULL); - memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables)); - #ifdef HAVE_LIBURING if (t->use_uring) { pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t); diff --git a/proxy.h b/proxy.h index 6ce6ec8..171f695 100644 --- a/proxy.h +++ b/proxy.h @@ -308,6 +308,7 @@ struct mcp_backend_label_s { char port[MAX_PORTLEN+1]; char label[MAX_LABELLEN+1]; size_t llen; // cache label length for small speedup in pool creation. + struct proxy_tunables tunables; }; // lua object wrapper meant to own a malloc'ed conn structure @@ -333,6 +334,7 @@ struct mcp_backend_s { char *rbuf; // statically allocated read buffer. size_t rbufused; // currently active bytes in the buffer struct event event; // libevent + struct proxy_tunables tunables; #ifdef HAVE_LIBURING proxy_event_t ur_rd_ev; // liburing. proxy_event_t ur_wr_ev; // need a separate event/cb for writing/polling @@ -357,13 +359,11 @@ struct proxy_event_thread_s { pthread_t thread_id; struct event_base *base; struct event notify_event; // listen event for the notify pipe/eventfd. - struct event clock_event; // timer for updating event thread data. struct event beconn_event; // listener for backends in connect state #ifdef HAVE_LIBURING struct io_uring ring; proxy_event_t ur_notify_event; // listen on eventfd. proxy_event_t ur_benotify_event; // listen on eventfd for backend connections. - proxy_event_t ur_clock_event; // timer for updating event thread data. eventfd_t event_counter; eventfd_t beevent_counter; bool use_uring; @@ -383,7 +383,6 @@ struct proxy_event_thread_s { int be_notify_send_fd; #endif proxy_ctx_t *ctx; // main context. - struct proxy_tunables tunables; // periodically copied from main ctx }; enum mcp_resp_mode { diff --git a/proxy_lua.c b/proxy_lua.c index 14f2564..f309515 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -2,6 +2,10 @@ #include "proxy.h" +// sad, I had to look this up... +#define NANOSECONDS(x) ((x) * 1E9 + 0.5) +#define MICROSECONDS(x) ((x) * 1E6 + 0.5) + // func prototype example: // static int fname (lua_State *L) // normal library open: @@ -140,13 +144,112 @@ static int mcplib_backend_gc(lua_State *L) { // backend label object; given to pools which then find or create backend // objects as necessary. +// allow optionally passing a table of arguments for extended options: +// { label = "etc", "host" = "127.0.0.1", port = "11211", +// readtimeout = 0.5, connecttimeout = 1, retrytime = 3, +// failurelimit = 3, tcpkeepalive = false } static int mcplib_backend(lua_State *L) { size_t llen = 0; size_t nlen = 0; size_t plen = 0; - const char *label = luaL_checklstring(L, 1, &llen); - const char *name = luaL_checklstring(L, 2, &nlen); - const char *port = luaL_checklstring(L, 3, &plen); + proxy_ctx_t *ctx = settings.proxy_ctx; + mcp_backend_label_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_label_t), 0); + memset(be, 0, sizeof(*be)); + const char *label; + const char *name; + const char *port; + // copy global defaults for tunables. + memcpy(&be->tunables, &ctx->tunables, sizeof(be->tunables)); + + if (lua_istable(L, 1)) { + + // We don't pop the label/host/port strings so lua won't change them + // until after the function call. + if (lua_getfield(L, 1, "label") != LUA_TNIL) { + label = luaL_checklstring(L, -1, &llen); + } else { + proxy_lua_error(L, "backend must have a label argument"); + return 0; + } + + if (lua_getfield(L, 1, "host") != LUA_TNIL) { + name = luaL_checklstring(L, -1, &nlen); + } else { + proxy_lua_error(L, "backend must have a host argument"); + return 0; + } + + // TODO: allow a default port. + if (lua_getfield(L, 1, "port") != LUA_TNIL) { + port = luaL_checklstring(L, -1, &plen); + } else { + proxy_lua_error(L, "backend must have a port argument"); + return 0; + } + + if (lua_getfield(L, 1, "tcpkeepalive") != LUA_TNIL) { + be->tunables.tcp_keepalive = lua_toboolean(L, -1); + } + lua_pop(L, 1); + + if (lua_getfield(L, 1, "failurelimit") != LUA_TNIL) { + int limit = luaL_checkinteger(L, -1); + if (limit < 0) { + proxy_lua_error(L, "failure_limit must be >= 0"); + return 0; + } + + be->tunables.backend_failure_limit = limit; + } + lua_pop(L, 1); + + if (lua_getfield(L, 1, "connecttimeout") != LUA_TNIL) { + lua_Number secondsf = luaL_checknumber(L, -1); + lua_Integer secondsi = (lua_Integer) secondsf; + lua_Number subseconds = secondsf - secondsi; + + be->tunables.connect.tv_sec = secondsi; + be->tunables.connect.tv_usec = MICROSECONDS(subseconds); +#ifdef HAVE_LIBURING + be->tunables.connect_ur.tv_sec = secondsi; + be->tunables.connect_ur.tv_nsec = NANOSECONDS(subseconds); +#endif + } + lua_pop(L, 1); + + if (lua_getfield(L, 1, "retrytimeout") != LUA_TNIL) { + lua_Number secondsf = luaL_checknumber(L, -1); + lua_Integer secondsi = (lua_Integer) secondsf; + lua_Number subseconds = secondsf - secondsi; + + be->tunables.retry.tv_sec = secondsi; + be->tunables.retry.tv_usec = MICROSECONDS(subseconds); +#ifdef HAVE_LIBURING + be->tunables.retry_ur.tv_sec = secondsi; + be->tunables.retry_ur.tv_nsec = NANOSECONDS(subseconds); +#endif + } + lua_pop(L, 1); + + if (lua_getfield(L, 1, "readtimeout") != LUA_TNIL) { + lua_Number secondsf = luaL_checknumber(L, -1); + lua_Integer secondsi = (lua_Integer) secondsf; + lua_Number subseconds = secondsf - secondsi; + + be->tunables.read.tv_sec = secondsi; + be->tunables.read.tv_usec = MICROSECONDS(subseconds); +#ifdef HAVE_LIBURING + be->tunables.read_ur.tv_sec = secondsi; + be->tunables.read_ur.tv_nsec = NANOSECONDS(subseconds); +#endif + } + lua_pop(L, 1); + + } else { + label = luaL_checklstring(L, 1, &llen); + name = luaL_checklstring(L, 2, &nlen); + port = luaL_checklstring(L, 3, &plen); + } if (llen > MAX_LABELLEN-1) { proxy_lua_error(L, "backend label too long"); @@ -163,8 +266,6 @@ static int mcplib_backend(lua_State *L) { return 0; } - mcp_backend_label_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_label_t), 0); - memset(be, 0, sizeof(*be)); memcpy(be->label, label, llen); be->label[llen] = '\0'; memcpy(be->name, name, nlen); @@ -172,6 +273,9 @@ static int mcplib_backend(lua_State *L) { memcpy(be->port, port, plen); be->port[plen] = '\0'; be->llen = llen; + if (lua_istable(L, 1)) { + lua_pop(L, 3); // drop label, name, port. + } luaL_getmetatable(L, "mcp.backend"); lua_setmetatable(L, -2); // set metatable to userdata. @@ -187,7 +291,8 @@ static mcp_backend_wrap_t *_mcplib_backend_checkcache(lua_State *L, mcp_backend_ if (ret != LUA_TNIL) { mcp_backend_wrap_t *be_orig = luaL_checkudata(L, -1, "mcp.backendwrap"); if (strncmp(be_orig->be->name, bel->name, MAX_NAMELEN) == 0 - && strncmp(be_orig->be->port, bel->port, MAX_PORTLEN) == 0) { + && strncmp(be_orig->be->port, bel->port, MAX_PORTLEN) == 0 + && memcmp(&be_orig->be->tunables, &bel->tunables, sizeof(bel->tunables)) == 0) { // backend is the same, return it. return be_orig; } else { @@ -218,6 +323,7 @@ static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_la strncpy(be->name, bel->name, MAX_NAMELEN+1); strncpy(be->port, bel->port, MAX_PORTLEN+1); + memcpy(&be->tunables, &bel->tunables, sizeof(bel->tunables)); STAILQ_INIT(&be->io_head); be->state = mcp_backend_read; @@ -628,10 +734,6 @@ static int mcplib_backend_failure_limit(lua_State *L) { return 0; } -// sad, I had to look this up... -#define NANOSECONDS(x) ((x) * 1E9 + 0.5) -#define MICROSECONDS(x) ((x) * 1E6 + 0.5) - static int mcplib_backend_connect_timeout(lua_State *L) { lua_Number secondsf = luaL_checknumber(L, -1); lua_Integer secondsi = (lua_Integer) secondsf; diff --git a/proxy_network.c b/proxy_network.c index d93dd6f..d893f3f 100644 --- a/proxy_network.c +++ b/proxy_network.c @@ -40,7 +40,6 @@ static void proxy_backend_handler(const int fd, const short which, void *arg); static void proxy_beconn_handler(const int fd, const short which, void *arg); static void proxy_event_handler(evutil_socket_t fd, short which, void *arg); static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg); -static void proxy_event_updater(evutil_socket_t fd, short which, void *arg); static int _prep_pending_write(mcp_backend_t *be); static bool _post_pending_write(mcp_backend_t *be, ssize_t sent); static int _flush_pending_write(mcp_backend_t *be); @@ -171,10 +170,7 @@ static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timesp static void _proxy_evthr_evset_be_readvalidate(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts); static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t); static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t); -static void _proxy_evthr_evset_clock(proxy_event_thread_t *t); -static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe); static void _backend_failed_ur(mcp_backend_t *be); -struct __kernel_timespec updater_ts = {.tv_sec = 3, .tv_nsec = 0}; static void _flush_pending_write_ur(mcp_backend_t *be) { // Allow us to be called with an empty stack to prevent dev errors. @@ -185,7 +181,7 @@ static void _flush_pending_write_ur(mcp_backend_t *be) { int iovcnt = _prep_pending_write(be); // TODO: write timeout. - _proxy_evthr_evset_be_writev(be, iovcnt, &be->event_thread->tunables.read_ur); + _proxy_evthr_evset_be_writev(be, iovcnt, &be->tunables.read_ur); } // TODO: we shouldn't handle reads if a write is pending, so postwrite should @@ -200,7 +196,7 @@ static void proxy_backend_postwrite_ur(void *udata, struct io_uring_cqe *cqe) { // FIXME: sent == 0 is disconnected? I keep forgetting. if (sent == -EAGAIN || sent == -EWOULDBLOCK) { // didn't do any writing, wait for a writeable socket. - _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.read_ur); + _proxy_evthr_evset_be_wrpoll(be, &be->tunables.read_ur); } else { _reset_bad_backend(be, P_BE_FAIL_WRITING); _backend_failed_ur(be); @@ -209,31 +205,19 @@ static void proxy_backend_postwrite_ur(void *udata, struct io_uring_cqe *cqe) { if (_post_pending_write(be, sent)) { // commands were flushed, set read handler. - _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur); + _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur); } if (be->io_next) { // still have unflushed commands, re-run write command. // writev can't "block if EAGAIN" in io_uring so far as I can tell, so // we have to switch to polling mode here. - _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.read_ur); + _proxy_evthr_evset_be_wrpoll(be, &be->tunables.read_ur); } // TODO: if rbufused != 0, push through drive machine? } -static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe) { - proxy_event_thread_t *t = udata; - proxy_ctx_t *ctx = t->ctx; - - _proxy_evthr_evset_clock(t); - - // we reuse the "global stats" lock since it's hardly ever used. - STAT_L(ctx); - memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables)); - STAT_UL(ctx); -} - // No-op at the moment. when the linked timeout fires uring returns the // linked request (read/write/poll/etc) with an interrupted/timeout/cancelled // error. So we don't need to explicitly handle timeouts. @@ -245,7 +229,7 @@ static void proxy_backend_timeout_handler_ur(void *udata, struct io_uring_cqe *c static void proxy_backend_retry_handler_ur(void *udata, struct io_uring_cqe *cqe) { mcp_backend_t *be = udata; - _proxy_evthr_evset_be_conn(be, &be->event_thread->tunables.connect_ur); + _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur); } static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) { @@ -259,19 +243,19 @@ static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) { sqe = io_uring_get_sqe(&be->event_thread->ring); // TODO (v2): NULL? - io_uring_prep_timeout(sqe, &be->event_thread->tunables.retry_ur, 0, 0); + io_uring_prep_timeout(sqe, &be->tunables.retry_ur, 0, 0); io_uring_sqe_set_data(sqe, &be->ur_te_ev); be->ur_te_ev.set = true; } static void _backend_failed_ur(mcp_backend_t *be) { - if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) { + if (++be->failed_count > be->tunables.backend_failure_limit) { P_DEBUG("%s: marking backend as bad\n", __func__); be->bad = true; _proxy_evthr_evset_be_retry(be); STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1); } else { - _proxy_evthr_evset_be_conn(be, &be->event_thread->tunables.connect_ur); + _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur); STAT_INCR(be->event_thread->ctx, backend_failed, 1); } } @@ -298,7 +282,7 @@ static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) { // TODO (v2): when exactly do we need to reset the backend handler? if (!STAILQ_EMPTY(&be->io_head)) { - _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur); + _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur); } } @@ -308,7 +292,7 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) { be->can_write = true; _flush_pending_write_ur(be); - _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur); + _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur); } // a backend with an outstanding new connection has become writeable. @@ -352,7 +336,7 @@ static void proxy_backend_beconn_ur(void *udata, struct io_uring_cqe *cqe) { // TODO: make validation optional. // set next handler on recv for validity check. - _proxy_evthr_evset_be_readvalidate(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur); + _proxy_evthr_evset_be_readvalidate(be, be->rbuf, READ_BUFFER_SIZE, &be->tunables.read_ur); } // TODO: share more code with proxy_beconn_handler @@ -443,7 +427,7 @@ static void proxy_beconn_handler_ur(void *udata, struct io_uring_cqe *cqe) { // will auto-wake because the socket is writeable. be->connecting = true; be->can_write = false; - _proxy_evthr_evset_be_conn(be, &t->tunables.connect_ur); + _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur); } else { _reset_bad_backend(be, P_BE_FAIL_CONNECTING); _backend_failed_ur(be); @@ -654,17 +638,6 @@ static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, io_uring_sqe_set_data(sqe, NULL); }*/ -static void _proxy_evthr_evset_clock(proxy_event_thread_t *t) { - struct io_uring_sqe *sqe; - - sqe = io_uring_get_sqe(&t->ring); - // FIXME (v2): NULL? - - io_uring_prep_timeout(sqe, &updater_ts, 0, 0); - io_uring_sqe_set_data(sqe, &t->ur_clock_event); - t->ur_clock_event.set = true; -} - static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t) { struct io_uring_sqe *sqe; P_DEBUG("%s: setting: %d\n", __func__, t->ur_benotify_event.set); @@ -745,28 +718,6 @@ void *proxy_event_thread_ur(void *arg) { } #endif // HAVE_LIBURING -// We need to get timeout/retry/etc updates to the event thread(s) -// occasionally. I'd like to have a better inteface around this where updates -// are shipped directly; but this is good enough to start with. -static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) { - proxy_event_thread_t *t = arg; - proxy_ctx_t *ctx = t->ctx; - - // TODO (v2): double check how much of this boilerplate is still necessary? - // reschedule the clock event. - evtimer_del(&t->clock_event); - - evtimer_set(&t->clock_event, proxy_event_updater, t); - event_base_set(t->base, &t->clock_event); - struct timeval rate = {.tv_sec = 3, .tv_usec = 0}; - evtimer_add(&t->clock_event, &rate); - - // we reuse the "global stats" lock since it's hardly ever used. - STAT_L(ctx); - memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables)); - STAT_UL(ctx); -} - static void _cleanup_backend(mcp_backend_t *be) { #ifdef HAVE_LIBURING if (be->event_thread->use_uring) { @@ -817,7 +768,6 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) { #endif beconn_head_t head; - struct timeval tmp_time = t->tunables.connect; STAILQ_INIT(&head); pthread_mutex_lock(&t->mutex); @@ -850,7 +800,7 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) { // will auto-wake because the socket is writeable. be->connecting = true; be->can_write = false; - _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler); + _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, be->tunables.connect, proxy_beconn_handler); } else { _reset_bad_backend(be, P_BE_FAIL_CONNECTING); _backend_failed(be); @@ -891,7 +841,6 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) { // Re-walk each backend and check set event as required. mcp_backend_t *be = NULL; - struct timeval tmp_time = t->tunables.read; // FIXME (v2): _set_event() is buggy, see notes on function. STAILQ_FOREACH(be, &t->be_head, be_next) { @@ -908,7 +857,7 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) { _backend_failed(be); } else { flags = be->can_write ? EV_READ|EV_TIMEOUT : EV_READ|EV_WRITE|EV_TIMEOUT; - _set_event(be, t->base, flags, tmp_time, proxy_backend_handler); + _set_event(be, t->base, flags, be->tunables.read, proxy_backend_handler); } } } @@ -1205,7 +1154,7 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { static void proxy_backend_retry_handler(const int fd, const short which, void *arg) { mcp_backend_t *be = arg; assert(which & EV_TIMEOUT); - struct timeval tmp_time = be->event_thread->tunables.retry; + struct timeval tmp_time = be->tunables.retry; _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler); } @@ -1223,8 +1172,8 @@ static void proxy_backend_retry_handler(const int fd, const short which, void *a // block them instead. That's more challenging so leaving a note instead // of doing this now :) static void _backend_failed(mcp_backend_t *be) { - struct timeval tmp_time = be->event_thread->tunables.retry; - if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) { + struct timeval tmp_time = be->tunables.retry; + if (++be->failed_count > be->tunables.backend_failure_limit) { P_DEBUG("%s: marking backend as bad\n", __func__); be->bad = true; _set_event(be, be->event_thread->base, EV_TIMEOUT, tmp_time, proxy_backend_retry_handler); @@ -1404,7 +1353,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) { assert(arg != NULL); mcp_backend_t *be = arg; int flags = EV_TIMEOUT; - struct timeval tmp_time = be->event_thread->tunables.read; + struct timeval tmp_time = be->tunables.read; if (which & EV_TIMEOUT) { P_DEBUG("%s: backend timed out while connecting\n", __func__); @@ -1504,7 +1453,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) { static void proxy_backend_handler(const int fd, const short which, void *arg) { mcp_backend_t *be = arg; int flags = EV_TIMEOUT; - struct timeval tmp_time = be->event_thread->tunables.read; + struct timeval tmp_time = be->tunables.read; if (which & EV_TIMEOUT) { P_DEBUG("%s: timeout received, killing backend queue\n", __func__); @@ -1616,12 +1565,6 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) { t->ur_benotify_event.set = false; _proxy_evthr_evset_benotifier(t); - // periodic data updater for event thread - t->ur_clock_event.cb = proxy_event_updater_ur; - t->ur_clock_event.udata = t; - t->ur_clock_event.set = false; - _proxy_evthr_evset_clock(t); - t->use_uring = true; return; } else { @@ -1659,11 +1602,6 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) { EV_READ | EV_PERSIST, proxy_event_beconn, t); #endif - evtimer_set(&t->clock_event, proxy_event_updater, t); - event_base_set(t->base, &t->clock_event); - struct timeval rate = {.tv_sec = 3, .tv_usec = 0}; - evtimer_add(&t->clock_event, &rate); - event_base_set(t->base, &t->notify_event); if (event_add(&t->notify_event, 0) == -1) { fprintf(stderr, "Can't monitor libevent notify pipe\n"); diff --git a/t/proxyconfig.lua b/t/proxyconfig.lua index 18a35de..5f9ad0e 100644 --- a/t/proxyconfig.lua +++ b/t/proxyconfig.lua @@ -2,6 +2,8 @@ -- so we can modify ourselves. local mode = dofile("/tmp/proxyconfigmode.lua") +mcp.backend_read_timeout(4) + function mcp_config_pools(old) if mode == "none" then return {} @@ -10,6 +12,19 @@ function mcp_config_pools(old) local b2 = mcp.backend('b2', '127.0.0.1', 11512) local b3 = mcp.backend('b3', '127.0.0.1', 11513) + local pools = { + test = mcp.pool({b1, b2, b3}) + } + return pools + elseif mode == "betable" then + local b1 = mcp.backend({ label = "b1", host = "127.0.0.1", port = 11511, + connecttimeout = 2, retrytimeout = 5, readtimeout = 0.1, + failurelimit = 0 }) + local b2 = mcp.backend({ label = "b2", host = "127.0.0.1", port = 11512, + connecttimeout = 2, retrytimeout = 5, readtimeout = 5 }) + local b3 = mcp.backend({ label = "b3", host = "127.0.0.1", port = 11513, + connecttimeout = 5, retrytimeout = 5, readtimeout = 5 }) + local pools = { test = mcp.pool({b1, b2, b3}) } @@ -24,7 +39,7 @@ function mcp_config_routes(zones) if mode == "none" then mcp.attach(mcp.CMD_MG, function(r) return "SERVER_ERROR no mg route\r\n" end) mcp.attach(mcp.CMD_MS, function(r) return "SERVER_ERROR no ms route\r\n" end) - elseif mode == "start" then + elseif mode == "start" or mode == "betable" then mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end) mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end) end diff --git a/t/proxyconfig.t b/t/proxyconfig.t index 2380997..9561415 100644 --- a/t/proxyconfig.t +++ b/t/proxyconfig.t @@ -119,7 +119,7 @@ is(<$watcher>, "OK\r\n", "watcher enabled"); # Try sending something. my $cmd = "mg foo v\r\n"; print $ps $cmd; - my @readable = $s->can_read(1); + my @readable = $s->can_read(0.25); is(scalar @readable, 1, "only one backend became readable"); my $be = shift @readable; is(scalar <$be>, $cmd, "metaget passthrough"); @@ -127,6 +127,42 @@ is(<$watcher>, "OK\r\n", "watcher enabled"); is(scalar <$ps>, "EN\r\n", "miss received"); } +# Test backend table arguments and per-backend time overrides +{ + # This should create three new backend sockets + write_modefile('return "betable"'); + $p_srv->reload(); + wait_reload($watcher); + + # sleep a short time; b1 should have a very short timeout and the + # others are long. + select(undef, undef, undef, 0.5); + + my $s = IO::Select->new(); + for my $msrv (@mocksrvs) { + $s->add($msrv); + } + my @readable = $s->can_read(0.25); + # All three backends should have changed despite having the same label, + # host, and port arguments. + is(scalar @readable, 3, "all listeners became readable"); + + like(<$watcher>, qr/ts=(\S+) gid=\d+ type=proxy_backend error=timeout name=\S+ port=11511/, "one backend timed out connecting"); + + for my $msrv (@readable) { + my $be = $msrv->accept(); + ok(defined $be, "mock backend accepted"); + like(<$be>, qr/version/, "received version command"); + print $be "VERSION 1.0.0-mock\r\n"; + } + + # reload again and ensure only the bad socket became available. + $p_srv->reload(); + wait_reload($watcher); + @readable = $s->can_read(0.5); + is(scalar @readable, 1, "only one listener became readable"); +} + # TODO: # remove backends # do dead sockets close? -- cgit v1.2.1