diff options
author | dormando <dormando@rydia.net> | 2023-01-31 17:20:30 -0800 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2023-02-01 14:28:29 -0800 |
commit | 58d8f40a90ee22d78adc0ae99f4b915e65f69be6 (patch) | |
tree | 4516a274f0f4a692db97ed235f400c4123534092 /proxy_network.c | |
parent | 0d8ebbfb823c94e5ce68550c9ae186d8af789446 (diff) | |
download | memcached-58d8f40a90ee22d78adc0ae99f4b915e65f69be6.tar.gz |
proxy: add mcp.backend(t) for more overrides
ie:
local b1 = mcp.backend({ label = "b1", host = "127.0.0.1", port = 11511,
connecttimeout = 1, retrytimeout = 0.5, readtimeout = 0.1,
failurelimit = 11 })
... to allow for overriding connect/retry/etc tunables on a per-backend
basis. If not passed in the global settings are used.
Diffstat (limited to 'proxy_network.c')
-rw-r--r-- | proxy_network.c | 100 |
1 files changed, 19 insertions, 81 deletions
diff --git a/proxy_network.c b/proxy_network.c index d93dd6f..d893f3f 100644 --- a/proxy_network.c +++ b/proxy_network.c @@ -40,7 +40,6 @@ static void proxy_backend_handler(const int fd, const short which, void *arg); static void proxy_beconn_handler(const int fd, const short which, void *arg); static void proxy_event_handler(evutil_socket_t fd, short which, void *arg); static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg); -static void proxy_event_updater(evutil_socket_t fd, short which, void *arg); static int _prep_pending_write(mcp_backend_t *be); static bool _post_pending_write(mcp_backend_t *be, ssize_t sent); static int _flush_pending_write(mcp_backend_t *be); @@ -171,10 +170,7 @@ static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timesp static void _proxy_evthr_evset_be_readvalidate(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts); static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t); static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t); -static void _proxy_evthr_evset_clock(proxy_event_thread_t *t); -static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe); static void _backend_failed_ur(mcp_backend_t *be); -struct __kernel_timespec updater_ts = {.tv_sec = 3, .tv_nsec = 0}; static void _flush_pending_write_ur(mcp_backend_t *be) { // Allow us to be called with an empty stack to prevent dev errors. @@ -185,7 +181,7 @@ static void _flush_pending_write_ur(mcp_backend_t *be) { int iovcnt = _prep_pending_write(be); // TODO: write timeout. - _proxy_evthr_evset_be_writev(be, iovcnt, &be->event_thread->tunables.read_ur); + _proxy_evthr_evset_be_writev(be, iovcnt, &be->tunables.read_ur); } // TODO: we shouldn't handle reads if a write is pending, so postwrite should @@ -200,7 +196,7 @@ static void proxy_backend_postwrite_ur(void *udata, struct io_uring_cqe *cqe) { // FIXME: sent == 0 is disconnected? I keep forgetting. if (sent == -EAGAIN || sent == -EWOULDBLOCK) { // didn't do any writing, wait for a writeable socket. - _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.read_ur); + _proxy_evthr_evset_be_wrpoll(be, &be->tunables.read_ur); } else { _reset_bad_backend(be, P_BE_FAIL_WRITING); _backend_failed_ur(be); @@ -209,31 +205,19 @@ static void proxy_backend_postwrite_ur(void *udata, struct io_uring_cqe *cqe) { if (_post_pending_write(be, sent)) { // commands were flushed, set read handler. - _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur); + _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur); } if (be->io_next) { // still have unflushed commands, re-run write command. // writev can't "block if EAGAIN" in io_uring so far as I can tell, so // we have to switch to polling mode here. - _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.read_ur); + _proxy_evthr_evset_be_wrpoll(be, &be->tunables.read_ur); } // TODO: if rbufused != 0, push through drive machine? } -static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe) { - proxy_event_thread_t *t = udata; - proxy_ctx_t *ctx = t->ctx; - - _proxy_evthr_evset_clock(t); - - // we reuse the "global stats" lock since it's hardly ever used. - STAT_L(ctx); - memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables)); - STAT_UL(ctx); -} - // No-op at the moment. when the linked timeout fires uring returns the // linked request (read/write/poll/etc) with an interrupted/timeout/cancelled // error. So we don't need to explicitly handle timeouts. @@ -245,7 +229,7 @@ static void proxy_backend_timeout_handler_ur(void *udata, struct io_uring_cqe *c static void proxy_backend_retry_handler_ur(void *udata, struct io_uring_cqe *cqe) { mcp_backend_t *be = udata; - _proxy_evthr_evset_be_conn(be, &be->event_thread->tunables.connect_ur); + _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur); } static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) { @@ -259,19 +243,19 @@ static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) { sqe = io_uring_get_sqe(&be->event_thread->ring); // TODO (v2): NULL? - io_uring_prep_timeout(sqe, &be->event_thread->tunables.retry_ur, 0, 0); + io_uring_prep_timeout(sqe, &be->tunables.retry_ur, 0, 0); io_uring_sqe_set_data(sqe, &be->ur_te_ev); be->ur_te_ev.set = true; } static void _backend_failed_ur(mcp_backend_t *be) { - if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) { + if (++be->failed_count > be->tunables.backend_failure_limit) { P_DEBUG("%s: marking backend as bad\n", __func__); be->bad = true; _proxy_evthr_evset_be_retry(be); STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1); } else { - _proxy_evthr_evset_be_conn(be, &be->event_thread->tunables.connect_ur); + _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur); STAT_INCR(be->event_thread->ctx, backend_failed, 1); } } @@ -298,7 +282,7 @@ static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) { // TODO (v2): when exactly do we need to reset the backend handler? if (!STAILQ_EMPTY(&be->io_head)) { - _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur); + _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur); } } @@ -308,7 +292,7 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) { be->can_write = true; _flush_pending_write_ur(be); - _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur); + _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur); } // a backend with an outstanding new connection has become writeable. @@ -352,7 +336,7 @@ static void proxy_backend_beconn_ur(void *udata, struct io_uring_cqe *cqe) { // TODO: make validation optional. // set next handler on recv for validity check. - _proxy_evthr_evset_be_readvalidate(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur); + _proxy_evthr_evset_be_readvalidate(be, be->rbuf, READ_BUFFER_SIZE, &be->tunables.read_ur); } // TODO: share more code with proxy_beconn_handler @@ -443,7 +427,7 @@ static void proxy_beconn_handler_ur(void *udata, struct io_uring_cqe *cqe) { // will auto-wake because the socket is writeable. be->connecting = true; be->can_write = false; - _proxy_evthr_evset_be_conn(be, &t->tunables.connect_ur); + _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur); } else { _reset_bad_backend(be, P_BE_FAIL_CONNECTING); _backend_failed_ur(be); @@ -654,17 +638,6 @@ static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len, io_uring_sqe_set_data(sqe, NULL); }*/ -static void _proxy_evthr_evset_clock(proxy_event_thread_t *t) { - struct io_uring_sqe *sqe; - - sqe = io_uring_get_sqe(&t->ring); - // FIXME (v2): NULL? - - io_uring_prep_timeout(sqe, &updater_ts, 0, 0); - io_uring_sqe_set_data(sqe, &t->ur_clock_event); - t->ur_clock_event.set = true; -} - static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t) { struct io_uring_sqe *sqe; P_DEBUG("%s: setting: %d\n", __func__, t->ur_benotify_event.set); @@ -745,28 +718,6 @@ void *proxy_event_thread_ur(void *arg) { } #endif // HAVE_LIBURING -// We need to get timeout/retry/etc updates to the event thread(s) -// occasionally. I'd like to have a better inteface around this where updates -// are shipped directly; but this is good enough to start with. -static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) { - proxy_event_thread_t *t = arg; - proxy_ctx_t *ctx = t->ctx; - - // TODO (v2): double check how much of this boilerplate is still necessary? - // reschedule the clock event. - evtimer_del(&t->clock_event); - - evtimer_set(&t->clock_event, proxy_event_updater, t); - event_base_set(t->base, &t->clock_event); - struct timeval rate = {.tv_sec = 3, .tv_usec = 0}; - evtimer_add(&t->clock_event, &rate); - - // we reuse the "global stats" lock since it's hardly ever used. - STAT_L(ctx); - memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables)); - STAT_UL(ctx); -} - static void _cleanup_backend(mcp_backend_t *be) { #ifdef HAVE_LIBURING if (be->event_thread->use_uring) { @@ -817,7 +768,6 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) { #endif beconn_head_t head; - struct timeval tmp_time = t->tunables.connect; STAILQ_INIT(&head); pthread_mutex_lock(&t->mutex); @@ -850,7 +800,7 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) { // will auto-wake because the socket is writeable. be->connecting = true; be->can_write = false; - _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler); + _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, be->tunables.connect, proxy_beconn_handler); } else { _reset_bad_backend(be, P_BE_FAIL_CONNECTING); _backend_failed(be); @@ -891,7 +841,6 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) { // Re-walk each backend and check set event as required. mcp_backend_t *be = NULL; - struct timeval tmp_time = t->tunables.read; // FIXME (v2): _set_event() is buggy, see notes on function. STAILQ_FOREACH(be, &t->be_head, be_next) { @@ -908,7 +857,7 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) { _backend_failed(be); } else { flags = be->can_write ? EV_READ|EV_TIMEOUT : EV_READ|EV_WRITE|EV_TIMEOUT; - _set_event(be, t->base, flags, tmp_time, proxy_backend_handler); + _set_event(be, t->base, flags, be->tunables.read, proxy_backend_handler); } } } @@ -1205,7 +1154,7 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { static void proxy_backend_retry_handler(const int fd, const short which, void *arg) { mcp_backend_t *be = arg; assert(which & EV_TIMEOUT); - struct timeval tmp_time = be->event_thread->tunables.retry; + struct timeval tmp_time = be->tunables.retry; _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler); } @@ -1223,8 +1172,8 @@ static void proxy_backend_retry_handler(const int fd, const short which, void *a // block them instead. That's more challenging so leaving a note instead // of doing this now :) static void _backend_failed(mcp_backend_t *be) { - struct timeval tmp_time = be->event_thread->tunables.retry; - if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) { + struct timeval tmp_time = be->tunables.retry; + if (++be->failed_count > be->tunables.backend_failure_limit) { P_DEBUG("%s: marking backend as bad\n", __func__); be->bad = true; _set_event(be, be->event_thread->base, EV_TIMEOUT, tmp_time, proxy_backend_retry_handler); @@ -1404,7 +1353,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) { assert(arg != NULL); mcp_backend_t *be = arg; int flags = EV_TIMEOUT; - struct timeval tmp_time = be->event_thread->tunables.read; + struct timeval tmp_time = be->tunables.read; if (which & EV_TIMEOUT) { P_DEBUG("%s: backend timed out while connecting\n", __func__); @@ -1504,7 +1453,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) { static void proxy_backend_handler(const int fd, const short which, void *arg) { mcp_backend_t *be = arg; int flags = EV_TIMEOUT; - struct timeval tmp_time = be->event_thread->tunables.read; + struct timeval tmp_time = be->tunables.read; if (which & EV_TIMEOUT) { P_DEBUG("%s: timeout received, killing backend queue\n", __func__); @@ -1616,12 +1565,6 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) { t->ur_benotify_event.set = false; _proxy_evthr_evset_benotifier(t); - // periodic data updater for event thread - t->ur_clock_event.cb = proxy_event_updater_ur; - t->ur_clock_event.udata = t; - t->ur_clock_event.set = false; - _proxy_evthr_evset_clock(t); - t->use_uring = true; return; } else { @@ -1659,11 +1602,6 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) { EV_READ | EV_PERSIST, proxy_event_beconn, t); #endif - evtimer_set(&t->clock_event, proxy_event_updater, t); - event_base_set(t->base, &t->clock_event); - struct timeval rate = {.tv_sec = 3, .tv_usec = 0}; - evtimer_add(&t->clock_event, &rate); - event_base_set(t->base, &t->notify_event); if (event_add(&t->notify_event, 0) == -1) { fprintf(stderr, "Can't monitor libevent notify pipe\n"); |