summaryrefslogtreecommitdiff
path: root/proxy_network.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-01-31 17:20:30 -0800
committerdormando <dormando@rydia.net>2023-02-01 14:28:29 -0800
commit58d8f40a90ee22d78adc0ae99f4b915e65f69be6 (patch)
tree4516a274f0f4a692db97ed235f400c4123534092 /proxy_network.c
parent0d8ebbfb823c94e5ce68550c9ae186d8af789446 (diff)
downloadmemcached-58d8f40a90ee22d78adc0ae99f4b915e65f69be6.tar.gz
proxy: add mcp.backend(t) for more overrides
ie: local b1 = mcp.backend({ label = "b1", host = "127.0.0.1", port = 11511, connecttimeout = 1, retrytimeout = 0.5, readtimeout = 0.1, failurelimit = 11 }) ... to allow for overriding connect/retry/etc tunables on a per-backend basis. If not passed in the global settings are used.
Diffstat (limited to 'proxy_network.c')
-rw-r--r--proxy_network.c100
1 files changed, 19 insertions, 81 deletions
diff --git a/proxy_network.c b/proxy_network.c
index d93dd6f..d893f3f 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -40,7 +40,6 @@ static void proxy_backend_handler(const int fd, const short which, void *arg);
static void proxy_beconn_handler(const int fd, const short which, void *arg);
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg);
-static void proxy_event_updater(evutil_socket_t fd, short which, void *arg);
static int _prep_pending_write(mcp_backend_t *be);
static bool _post_pending_write(mcp_backend_t *be, ssize_t sent);
static int _flush_pending_write(mcp_backend_t *be);
@@ -171,10 +170,7 @@ static void _proxy_evthr_evset_be_conn(mcp_backend_t *be, struct __kernel_timesp
static void _proxy_evthr_evset_be_readvalidate(mcp_backend_t *be, char *buf, size_t len, struct __kernel_timespec *ts);
static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t);
static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t);
-static void _proxy_evthr_evset_clock(proxy_event_thread_t *t);
-static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe);
static void _backend_failed_ur(mcp_backend_t *be);
-struct __kernel_timespec updater_ts = {.tv_sec = 3, .tv_nsec = 0};
static void _flush_pending_write_ur(mcp_backend_t *be) {
// Allow us to be called with an empty stack to prevent dev errors.
@@ -185,7 +181,7 @@ static void _flush_pending_write_ur(mcp_backend_t *be) {
int iovcnt = _prep_pending_write(be);
// TODO: write timeout.
- _proxy_evthr_evset_be_writev(be, iovcnt, &be->event_thread->tunables.read_ur);
+ _proxy_evthr_evset_be_writev(be, iovcnt, &be->tunables.read_ur);
}
// TODO: we shouldn't handle reads if a write is pending, so postwrite should
@@ -200,7 +196,7 @@ static void proxy_backend_postwrite_ur(void *udata, struct io_uring_cqe *cqe) {
// FIXME: sent == 0 is disconnected? I keep forgetting.
if (sent == -EAGAIN || sent == -EWOULDBLOCK) {
// didn't do any writing, wait for a writeable socket.
- _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.read_ur);
+ _proxy_evthr_evset_be_wrpoll(be, &be->tunables.read_ur);
} else {
_reset_bad_backend(be, P_BE_FAIL_WRITING);
_backend_failed_ur(be);
@@ -209,31 +205,19 @@ static void proxy_backend_postwrite_ur(void *udata, struct io_uring_cqe *cqe) {
if (_post_pending_write(be, sent)) {
// commands were flushed, set read handler.
- _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur);
+ _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur);
}
if (be->io_next) {
// still have unflushed commands, re-run write command.
// writev can't "block if EAGAIN" in io_uring so far as I can tell, so
// we have to switch to polling mode here.
- _proxy_evthr_evset_be_wrpoll(be, &be->event_thread->tunables.read_ur);
+ _proxy_evthr_evset_be_wrpoll(be, &be->tunables.read_ur);
}
// TODO: if rbufused != 0, push through drive machine?
}
-static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe) {
- proxy_event_thread_t *t = udata;
- proxy_ctx_t *ctx = t->ctx;
-
- _proxy_evthr_evset_clock(t);
-
- // we reuse the "global stats" lock since it's hardly ever used.
- STAT_L(ctx);
- memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables));
- STAT_UL(ctx);
-}
-
// No-op at the moment. when the linked timeout fires uring returns the
// linked request (read/write/poll/etc) with an interrupted/timeout/cancelled
// error. So we don't need to explicitly handle timeouts.
@@ -245,7 +229,7 @@ static void proxy_backend_timeout_handler_ur(void *udata, struct io_uring_cqe *c
static void proxy_backend_retry_handler_ur(void *udata, struct io_uring_cqe *cqe) {
mcp_backend_t *be = udata;
- _proxy_evthr_evset_be_conn(be, &be->event_thread->tunables.connect_ur);
+ _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur);
}
static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) {
@@ -259,19 +243,19 @@ static void _proxy_evthr_evset_be_retry(mcp_backend_t *be) {
sqe = io_uring_get_sqe(&be->event_thread->ring);
// TODO (v2): NULL?
- io_uring_prep_timeout(sqe, &be->event_thread->tunables.retry_ur, 0, 0);
+ io_uring_prep_timeout(sqe, &be->tunables.retry_ur, 0, 0);
io_uring_sqe_set_data(sqe, &be->ur_te_ev);
be->ur_te_ev.set = true;
}
static void _backend_failed_ur(mcp_backend_t *be) {
- if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) {
+ if (++be->failed_count > be->tunables.backend_failure_limit) {
P_DEBUG("%s: marking backend as bad\n", __func__);
be->bad = true;
_proxy_evthr_evset_be_retry(be);
STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1);
} else {
- _proxy_evthr_evset_be_conn(be, &be->event_thread->tunables.connect_ur);
+ _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur);
STAT_INCR(be->event_thread->ctx, backend_failed, 1);
}
}
@@ -298,7 +282,7 @@ static void proxy_backend_handler_ur(void *udata, struct io_uring_cqe *cqe) {
// TODO (v2): when exactly do we need to reset the backend handler?
if (!STAILQ_EMPTY(&be->io_head)) {
- _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur);
+ _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur);
}
}
@@ -308,7 +292,7 @@ static void proxy_backend_wrhandler_ur(void *udata, struct io_uring_cqe *cqe) {
be->can_write = true;
_flush_pending_write_ur(be);
- _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->event_thread->tunables.read_ur);
+ _proxy_evthr_evset_be_read(be, be->rbuf+be->rbufused, READ_BUFFER_SIZE-be->rbufused, &be->tunables.read_ur);
}
// a backend with an outstanding new connection has become writeable.
@@ -352,7 +336,7 @@ static void proxy_backend_beconn_ur(void *udata, struct io_uring_cqe *cqe) {
// TODO: make validation optional.
// set next handler on recv for validity check.
- _proxy_evthr_evset_be_readvalidate(be, be->rbuf, READ_BUFFER_SIZE, &be->event_thread->tunables.read_ur);
+ _proxy_evthr_evset_be_readvalidate(be, be->rbuf, READ_BUFFER_SIZE, &be->tunables.read_ur);
}
// TODO: share more code with proxy_beconn_handler
@@ -443,7 +427,7 @@ static void proxy_beconn_handler_ur(void *udata, struct io_uring_cqe *cqe) {
// will auto-wake because the socket is writeable.
be->connecting = true;
be->can_write = false;
- _proxy_evthr_evset_be_conn(be, &t->tunables.connect_ur);
+ _proxy_evthr_evset_be_conn(be, &be->tunables.connect_ur);
} else {
_reset_bad_backend(be, P_BE_FAIL_CONNECTING);
_backend_failed_ur(be);
@@ -654,17 +638,6 @@ static void _proxy_evthr_evset_be_read(mcp_backend_t *be, char *buf, size_t len,
io_uring_sqe_set_data(sqe, NULL);
}*/
-static void _proxy_evthr_evset_clock(proxy_event_thread_t *t) {
- struct io_uring_sqe *sqe;
-
- sqe = io_uring_get_sqe(&t->ring);
- // FIXME (v2): NULL?
-
- io_uring_prep_timeout(sqe, &updater_ts, 0, 0);
- io_uring_sqe_set_data(sqe, &t->ur_clock_event);
- t->ur_clock_event.set = true;
-}
-
static void _proxy_evthr_evset_benotifier(proxy_event_thread_t *t) {
struct io_uring_sqe *sqe;
P_DEBUG("%s: setting: %d\n", __func__, t->ur_benotify_event.set);
@@ -745,28 +718,6 @@ void *proxy_event_thread_ur(void *arg) {
}
#endif // HAVE_LIBURING
-// We need to get timeout/retry/etc updates to the event thread(s)
-// occasionally. I'd like to have a better inteface around this where updates
-// are shipped directly; but this is good enough to start with.
-static void proxy_event_updater(evutil_socket_t fd, short which, void *arg) {
- proxy_event_thread_t *t = arg;
- proxy_ctx_t *ctx = t->ctx;
-
- // TODO (v2): double check how much of this boilerplate is still necessary?
- // reschedule the clock event.
- evtimer_del(&t->clock_event);
-
- evtimer_set(&t->clock_event, proxy_event_updater, t);
- event_base_set(t->base, &t->clock_event);
- struct timeval rate = {.tv_sec = 3, .tv_usec = 0};
- evtimer_add(&t->clock_event, &rate);
-
- // we reuse the "global stats" lock since it's hardly ever used.
- STAT_L(ctx);
- memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables));
- STAT_UL(ctx);
-}
-
static void _cleanup_backend(mcp_backend_t *be) {
#ifdef HAVE_LIBURING
if (be->event_thread->use_uring) {
@@ -817,7 +768,6 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
#endif
beconn_head_t head;
- struct timeval tmp_time = t->tunables.connect;
STAILQ_INIT(&head);
pthread_mutex_lock(&t->mutex);
@@ -850,7 +800,7 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
// will auto-wake because the socket is writeable.
be->connecting = true;
be->can_write = false;
- _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
+ _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, be->tunables.connect, proxy_beconn_handler);
} else {
_reset_bad_backend(be, P_BE_FAIL_CONNECTING);
_backend_failed(be);
@@ -891,7 +841,6 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
// Re-walk each backend and check set event as required.
mcp_backend_t *be = NULL;
- struct timeval tmp_time = t->tunables.read;
// FIXME (v2): _set_event() is buggy, see notes on function.
STAILQ_FOREACH(be, &t->be_head, be_next) {
@@ -908,7 +857,7 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
_backend_failed(be);
} else {
flags = be->can_write ? EV_READ|EV_TIMEOUT : EV_READ|EV_WRITE|EV_TIMEOUT;
- _set_event(be, t->base, flags, tmp_time, proxy_backend_handler);
+ _set_event(be, t->base, flags, be->tunables.read, proxy_backend_handler);
}
}
}
@@ -1205,7 +1154,7 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
static void proxy_backend_retry_handler(const int fd, const short which, void *arg) {
mcp_backend_t *be = arg;
assert(which & EV_TIMEOUT);
- struct timeval tmp_time = be->event_thread->tunables.retry;
+ struct timeval tmp_time = be->tunables.retry;
_set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
}
@@ -1223,8 +1172,8 @@ static void proxy_backend_retry_handler(const int fd, const short which, void *a
// block them instead. That's more challenging so leaving a note instead
// of doing this now :)
static void _backend_failed(mcp_backend_t *be) {
- struct timeval tmp_time = be->event_thread->tunables.retry;
- if (++be->failed_count > be->event_thread->tunables.backend_failure_limit) {
+ struct timeval tmp_time = be->tunables.retry;
+ if (++be->failed_count > be->tunables.backend_failure_limit) {
P_DEBUG("%s: marking backend as bad\n", __func__);
be->bad = true;
_set_event(be, be->event_thread->base, EV_TIMEOUT, tmp_time, proxy_backend_retry_handler);
@@ -1404,7 +1353,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) {
assert(arg != NULL);
mcp_backend_t *be = arg;
int flags = EV_TIMEOUT;
- struct timeval tmp_time = be->event_thread->tunables.read;
+ struct timeval tmp_time = be->tunables.read;
if (which & EV_TIMEOUT) {
P_DEBUG("%s: backend timed out while connecting\n", __func__);
@@ -1504,7 +1453,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) {
static void proxy_backend_handler(const int fd, const short which, void *arg) {
mcp_backend_t *be = arg;
int flags = EV_TIMEOUT;
- struct timeval tmp_time = be->event_thread->tunables.read;
+ struct timeval tmp_time = be->tunables.read;
if (which & EV_TIMEOUT) {
P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
@@ -1616,12 +1565,6 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) {
t->ur_benotify_event.set = false;
_proxy_evthr_evset_benotifier(t);
- // periodic data updater for event thread
- t->ur_clock_event.cb = proxy_event_updater_ur;
- t->ur_clock_event.udata = t;
- t->ur_clock_event.set = false;
- _proxy_evthr_evset_clock(t);
-
t->use_uring = true;
return;
} else {
@@ -1659,11 +1602,6 @@ void proxy_init_evthread_events(proxy_event_thread_t *t) {
EV_READ | EV_PERSIST, proxy_event_beconn, t);
#endif
- evtimer_set(&t->clock_event, proxy_event_updater, t);
- event_base_set(t->base, &t->clock_event);
- struct timeval rate = {.tv_sec = 3, .tv_usec = 0};
- evtimer_add(&t->clock_event, &rate);
-
event_base_set(t->base, &t->notify_event);
if (event_add(&t->notify_event, 0) == -1) {
fprintf(stderr, "Can't monitor libevent notify pipe\n");