summaryrefslogtreecommitdiff
path: root/proxy_network.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2023-02-21 12:54:07 -0800
committerdormando <dormando@rydia.net>2023-02-22 11:12:51 -0800
commit833a7234bbaed264a9973141850a23df4eb1b939 (patch)
tree2b3c49440e923cef1f157085031387b86c51e7f0 /proxy_network.c
parentaf037c38cda6d9a07e82d0386202aa3c9d08faea (diff)
downloadmemcached-833a7234bbaed264a9973141850a23df4eb1b939.tar.gz
proxy: redo libevent handling code
The event handling code was unoptimized and temporary; it was slated for a rewrite for performance and non-critical bugs alone. However the old code may be causing critical bugs so it's being rewritten now. Fixes: - backend disconnects are detected immediately instead of on the next time they are used. - backend reconnects happen _after_ the retry timeout, not before - use a persistent read handler and a temporary write handler to avoid constantly calling epoll_ctl syscalls for potential performance boost. Updated some tests for proxyconfig.t as it was picking up the disconnects immediately. Unrelated to a timing issue I resolved to the benchmark.
Diffstat (limited to 'proxy_network.c')
-rw-r--r--proxy_network.c221
1 files changed, 143 insertions, 78 deletions
diff --git a/proxy_network.c b/proxy_network.c
index 3983ec5..239b0d4 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -41,12 +41,17 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg);
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg);
static int _prep_pending_write(mcp_backend_t *be);
-static bool _post_pending_write(mcp_backend_t *be, ssize_t sent);
+static void _post_pending_write(mcp_backend_t *be, ssize_t sent);
static int _flush_pending_write(mcp_backend_t *be);
static void _cleanup_backend(mcp_backend_t *be);
static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err);
static void _backend_failed(mcp_backend_t *be);
-static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback);
+static void _set_main_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval *t, event_callback_fn callback);
+static void _stop_main_event(mcp_backend_t *be);
+static void _start_write_event(mcp_backend_t *be);
+static void _stop_write_event(mcp_backend_t *be);
+static void _start_timeout_event(mcp_backend_t *be);
+static void _stop_timeout_event(mcp_backend_t *be);
static int proxy_backend_drive_machine(mcp_backend_t *be);
/* Helper routines common to io_uring and libevent modes */
@@ -725,12 +730,17 @@ static void _cleanup_backend(mcp_backend_t *be) {
} else {
#endif
// remove any pending events.
- int pending = 0;
- if (event_initialized(&be->event)) {
- pending = event_pending(&be->event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL);
+ int pending = event_pending(&be->main_event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL);
+ if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) {
+ event_del(&be->main_event); // an error to call event_del() without event.
}
+ pending = event_pending(&be->write_event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL);
if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) {
- event_del(&be->event); // an error to call event_del() without event.
+ event_del(&be->write_event); // an error to call event_del() without event.
+ }
+ pending = event_pending(&be->timeout_event, EV_TIMEOUT, NULL);
+ if ((pending & (EV_TIMEOUT)) != 0) {
+ event_del(&be->timeout_event); // an error to call event_del() without event.
}
#ifdef HAVE_LIBURING
}
@@ -793,14 +803,22 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) {
} else {
be->transferred = true;
be->event_thread = t;
+ // assign the initial events to the backend, so we don't have to
+ // constantly check if they were initialized yet elsewhere.
+ // note these events will not fire until event_add() is called.
int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
+ event_assign(&be->main_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_beconn_handler, be);
+ event_assign(&be->write_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be);
+ event_assign(&be->timeout_event, t->base, -1, EV_TIMEOUT, proxy_backend_handler, be);
+
if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) {
// if we're already connected for some reason, still push it
// through the connection handler to keep the code unified. It
// will auto-wake because the socket is writeable.
be->connecting = true;
be->can_write = false;
- _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, be->tunables.connect, proxy_beconn_handler);
+ // kick off the event we intialized above.
+ event_add(&be->main_event, &be->tunables.connect);
} else {
_reset_bad_backend(be, P_BE_FAIL_CONNECTING);
_backend_failed(be);
@@ -855,9 +873,13 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) {
if (flags == -1) {
_reset_bad_backend(be, P_BE_FAIL_WRITING);
_backend_failed(be);
- } else {
- flags |= EV_TIMEOUT;
- _set_event(be, t->base, flags, be->tunables.read, proxy_backend_handler);
+ } else if (flags & EV_WRITE) {
+ // only get here because we need to kick off the write handler
+ _start_write_event(be);
+ }
+
+ if (be->pending_read) {
+ _start_timeout_event(be);
}
}
}
@@ -876,28 +898,58 @@ void *proxy_event_thread(void *arg) {
return NULL;
}
-// FIXME (v2): if we use the newer API the various pending checks can be adjusted.
-static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback) {
- // FIXME (v2): chicken and egg.
- // can't check if pending if the structure is was calloc'ed (sigh)
- // don't want to double test here. should be able to event_assign but
- // not add anything during initialization, but need the owner thread's
- // event base.
- int pending = 0;
- if (event_initialized(&be->event)) {
- pending = event_pending(&be->event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL);
- }
- if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) {
- event_del(&be->event); // replace existing event.
+static void _set_main_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval *t, event_callback_fn callback) {
+ int pending = event_pending(&be->main_event, EV_READ|EV_WRITE, NULL);
+ if ((pending & (EV_READ|EV_WRITE)) != 0) {
+ event_del(&be->main_event); // replace existing event.
}
- // if we can't write, we could be connecting.
- // TODO (v2): always check for READ in case some commands were sent
- // successfully? The flags could be tracked on *be and reset in the
- // handler, perhaps?
- event_assign(&be->event, base, mcmc_fd(be->client),
+ event_assign(&be->main_event, base, mcmc_fd(be->client),
flags, callback, be);
- event_add(&be->event, &t);
+ event_add(&be->main_event, t);
+}
+
+static void _stop_main_event(mcp_backend_t *be) {
+ int pending = event_pending(&be->main_event, EV_READ|EV_WRITE, NULL);
+ if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) == 0) {
+ return;
+ }
+ event_del(&be->write_event);
+}
+
+static void _start_write_event(mcp_backend_t *be) {
+ int pending = event_pending(&be->main_event, EV_WRITE, NULL);
+ if ((pending & (EV_WRITE|EV_TIMEOUT)) != 0) {
+ return;
+ }
+ // FIXME: wasn't there a write timeout?
+ event_add(&be->write_event, &be->tunables.read);
+}
+
+static void _stop_write_event(mcp_backend_t *be) {
+ int pending = event_pending(&be->main_event, EV_WRITE, NULL);
+ if ((pending & (EV_WRITE|EV_TIMEOUT)) == 0) {
+ return;
+ }
+ event_del(&be->write_event);
+}
+
+// handle the read timeouts with a side event, so we can stick with a
+// persistent listener (optimization + catch disconnects faster)
+static void _start_timeout_event(mcp_backend_t *be) {
+ int pending = event_pending(&be->timeout_event, EV_TIMEOUT, NULL);
+ if ((pending & (EV_TIMEOUT)) != 0) {
+ return;
+ }
+ event_add(&be->timeout_event, &be->tunables.read);
+}
+
+static void _stop_timeout_event(mcp_backend_t *be) {
+ int pending = event_pending(&be->timeout_event, EV_TIMEOUT, NULL);
+ if ((pending & (EV_TIMEOUT)) == 0) {
+ return;
+ }
+ event_del(&be->timeout_event);
}
// NOTES:
@@ -1106,6 +1158,7 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
// set the head here. when we break the head will be correct.
STAILQ_REMOVE_HEAD(&be->io_head, io_next);
be->depth--;
+ be->pending_read--;
// have to do the q->count-- and == 0 and redispatch_conn()
// stuff here. The moment we call return_io here we
// don't own *p anymore.
@@ -1150,12 +1203,39 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
return flags;
}
+static void _backend_reconnect(mcp_backend_t *be) {
+ int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
+ if (status == MCMC_CONNECTED) {
+ // TODO (v2): unexpected but lets let it be here.
+ be->connecting = false;
+ be->can_write = true;
+ } else if (status == MCMC_CONNECTING) {
+ be->connecting = true;
+ be->can_write = false;
+ } else {
+ // TODO (v2): failed to immediately re-establish the connection.
+ // need to put the BE into a bad/retry state.
+ // FIXME (v2): until we get an event to specifically handle connecting and
+ // bad server handling, attempt to force a reconnect here the next
+ // time a request comes through.
+ // The event thread will attempt to write to the backend, fail, then
+ // end up in this routine again.
+ be->connecting = false;
+ be->can_write = true;
+ }
+ // re-create the write handler for the new file descriptor.
+ // the main event will be re-assigned after this call.
+ event_assign(&be->write_event, be->event_thread->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be);
+ // do not need to re-assign the timer event because it's not tied to fd
+}
+
// All we need to do here is schedule the backend to attempt to connect again.
static void proxy_backend_retry_handler(const int fd, const short which, void *arg) {
mcp_backend_t *be = arg;
assert(which & EV_TIMEOUT);
struct timeval tmp_time = be->tunables.retry;
- _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
+ _backend_reconnect(be);
+ _set_main_event(be, be->event_thread->base, EV_WRITE, &tmp_time, proxy_beconn_handler);
}
// currently just for timeouts, but certain errors should consider a backend
@@ -1176,11 +1256,12 @@ static void _backend_failed(mcp_backend_t *be) {
if (++be->failed_count > be->tunables.backend_failure_limit) {
P_DEBUG("%s: marking backend as bad\n", __func__);
be->bad = true;
- _set_event(be, be->event_thread->base, EV_TIMEOUT, tmp_time, proxy_backend_retry_handler);
+ _set_main_event(be, be->event_thread->base, EV_TIMEOUT, &tmp_time, proxy_backend_retry_handler);
STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1);
} else {
STAT_INCR(be->event_thread->ctx, backend_failed, 1);
- _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler);
+ _backend_reconnect(be);
+ _set_main_event(be, be->event_thread->base, EV_WRITE, &tmp_time, proxy_beconn_handler);
}
}
@@ -1214,26 +1295,13 @@ static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) {
// reset buffer to blank state.
be->rbufused = 0;
+ be->pending_read = 0;
+ // allow the _backend_failed() routine to connect when ready.
+ _stop_write_event(be);
+ _stop_main_event(be);
+ _stop_timeout_event(be);
mcmc_disconnect(be->client);
- int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags);
- if (status == MCMC_CONNECTED) {
- // TODO (v2): unexpected but lets let it be here.
- be->connecting = false;
- be->can_write = true;
- } else if (status == MCMC_CONNECTING) {
- be->connecting = true;
- be->can_write = false;
- } else {
- // TODO (v2): failed to immediately re-establish the connection.
- // need to put the BE into a bad/retry state.
- // FIXME (v2): until we get an event to specifically handle connecting and
- // bad server handling, attempt to force a reconnect here the next
- // time a request comes through.
- // The event thread will attempt to write to the backend, fail, then
- // end up in this routine again.
- be->connecting = false;
- be->can_write = true;
- }
+ // we leave the main event alone, because be_failed() always overwrites.
return 0;
}
@@ -1266,13 +1334,12 @@ static int _prep_pending_write(mcp_backend_t *be) {
}
// returns true if any pending writes were fully flushed.
-static bool _post_pending_write(mcp_backend_t *be, ssize_t sent) {
+static void _post_pending_write(mcp_backend_t *be, ssize_t sent) {
io_pending_proxy_t *io = be->io_next;
if (io == NULL) {
io = STAILQ_FIRST(&be->io_head);
}
- bool did_flush = false;
for (; io; io = STAILQ_NEXT(io, io_next)) {
bool flushed = true;
if (io->flushed)
@@ -1297,10 +1364,10 @@ static bool _post_pending_write(mcp_backend_t *be, ssize_t sent) {
}
}
io->flushed = flushed;
-
if (flushed) {
- did_flush = flushed;
+ be->pending_read++;
}
+
if (sent <= 0) {
// really shouldn't be negative, though.
assert(sent >= 0);
@@ -1314,8 +1381,6 @@ static bool _post_pending_write(mcp_backend_t *be, ssize_t sent) {
} else {
be->io_next = NULL;
}
-
- return did_flush;
}
static int _flush_pending_write(mcp_backend_t *be) {
@@ -1329,9 +1394,7 @@ static int _flush_pending_write(mcp_backend_t *be) {
ssize_t sent = writev(mcmc_fd(be->client), be->write_iovs, iovcnt);
if (sent > 0) {
- if (_post_pending_write(be, sent)) {
- flags |= EV_READ;
- }
+ _post_pending_write(be, sent);
// still have unflushed pending IO's, check for write and re-loop.
if (be->io_next) {
be->can_write = false;
@@ -1370,7 +1433,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) {
if (_proxy_beconn_checkconnect(be) == -1) {
return;
}
- _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler);
+ _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_beconn_handler);
}
// TODO: currently never taken, until validation is made optional.
@@ -1382,6 +1445,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) {
return;
}
flags |= res;
+ // FIXME: set write event?
}
}
@@ -1398,7 +1462,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) {
// Needed more data for a version line, somehow. I feel like
// this should set off some alarms, but it is possible.
if (r.code == MCMC_WANT_READ) {
- _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler);
+ _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_beconn_handler);
return;
}
@@ -1427,7 +1491,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) {
_backend_failed(be);
return;
}
- _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler);
+ _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_beconn_handler);
return;
}
@@ -1438,14 +1502,18 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) {
_backend_failed(be);
return;
}
- flags |= res;
+ if (flags & EV_WRITE) {
+ _start_write_event(be);
+ }
+ if (be->pending_read) {
+ _start_timeout_event(be);
+ }
}
- // Still pending requests to read or write.
- if (!be->validating && !STAILQ_EMPTY(&be->io_head)) {
- _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler);
+ // switch to the primary persistent read event.
+ if (!be->validating) {
+ _set_main_event(be, be->event_thread->base, EV_READ|EV_PERSIST, NULL, proxy_backend_handler);
}
-
}
// The libevent backend callback handler.
@@ -1453,8 +1521,6 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) {
// state.
static void proxy_backend_handler(const int fd, const short which, void *arg) {
mcp_backend_t *be = arg;
- int flags = EV_TIMEOUT;
- struct timeval tmp_time = be->tunables.read;
if (which & EV_TIMEOUT) {
P_DEBUG("%s: timeout received, killing backend queue\n", __func__);
@@ -1471,10 +1537,14 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
_backend_failed(be);
return;
}
- flags |= res;
+ if (res & EV_WRITE) {
+ _start_write_event(be);
+ }
}
if (which & EV_READ) {
+ // got a read event, always kill the pending read timer.
+ _stop_timeout_event(be);
// We do the syscall here before diving into the state machine to allow a
// common code path for io_uring/epoll
int read = recv(mcmc_fd(be->client), be->rbuf + be->rbufused,
@@ -1508,13 +1578,8 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) {
#endif
}
- // Still pending requests to read or write.
- if (!STAILQ_EMPTY(&be->io_head)) {
- flags |= EV_READ; // FIXME (v2): might not be necessary here, but ensures we get a disconnect event.
- if (!be->can_write) {
- flags |= EV_WRITE;
- }
- _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler);
+ if (be->pending_read) {
+ _start_timeout_event(be);
}
}