From 833a7234bbaed264a9973141850a23df4eb1b939 Mon Sep 17 00:00:00 2001 From: dormando Date: Tue, 21 Feb 2023 12:54:07 -0800 Subject: proxy: redo libevent handling code The event handling code was unoptimized and temporary; it was slated for a rewrite for performance and non-critical bugs alone. However the old code may be causing critical bugs so it's being rewritten now. Fixes: - backend disconnects are detected immediately instead of on the next time they are used. - backend reconnects happen _after_ the retry timeout, not before - use a persistent read handler and a temporary write handler to avoid constantly calling epoll_ctl syscalls for potential performance boost. Updated some tests for proxyconfig.t as it was picking up the disconnects immediately. Unrelated to a timing issue I resolved to the benchmark. --- proxy_network.c | 221 ++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 143 insertions(+), 78 deletions(-) (limited to 'proxy_network.c') diff --git a/proxy_network.c b/proxy_network.c index 3983ec5..239b0d4 100644 --- a/proxy_network.c +++ b/proxy_network.c @@ -41,12 +41,17 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg); static void proxy_event_handler(evutil_socket_t fd, short which, void *arg); static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg); static int _prep_pending_write(mcp_backend_t *be); -static bool _post_pending_write(mcp_backend_t *be, ssize_t sent); +static void _post_pending_write(mcp_backend_t *be, ssize_t sent); static int _flush_pending_write(mcp_backend_t *be); static void _cleanup_backend(mcp_backend_t *be); static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err); static void _backend_failed(mcp_backend_t *be); -static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback); +static void _set_main_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval *t, event_callback_fn callback); +static void _stop_main_event(mcp_backend_t *be); +static void _start_write_event(mcp_backend_t *be); +static void _stop_write_event(mcp_backend_t *be); +static void _start_timeout_event(mcp_backend_t *be); +static void _stop_timeout_event(mcp_backend_t *be); static int proxy_backend_drive_machine(mcp_backend_t *be); /* Helper routines common to io_uring and libevent modes */ @@ -725,12 +730,17 @@ static void _cleanup_backend(mcp_backend_t *be) { } else { #endif // remove any pending events. - int pending = 0; - if (event_initialized(&be->event)) { - pending = event_pending(&be->event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL); + int pending = event_pending(&be->main_event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL); + if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) { + event_del(&be->main_event); // an error to call event_del() without event. } + pending = event_pending(&be->write_event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL); if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) { - event_del(&be->event); // an error to call event_del() without event. + event_del(&be->write_event); // an error to call event_del() without event. + } + pending = event_pending(&be->timeout_event, EV_TIMEOUT, NULL); + if ((pending & (EV_TIMEOUT)) != 0) { + event_del(&be->timeout_event); // an error to call event_del() without event. } #ifdef HAVE_LIBURING } @@ -793,14 +803,22 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) { } else { be->transferred = true; be->event_thread = t; + // assign the initial events to the backend, so we don't have to + // constantly check if they were initialized yet elsewhere. + // note these events will not fire until event_add() is called. int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags); + event_assign(&be->main_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_beconn_handler, be); + event_assign(&be->write_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be); + event_assign(&be->timeout_event, t->base, -1, EV_TIMEOUT, proxy_backend_handler, be); + if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) { // if we're already connected for some reason, still push it // through the connection handler to keep the code unified. It // will auto-wake because the socket is writeable. be->connecting = true; be->can_write = false; - _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, be->tunables.connect, proxy_beconn_handler); + // kick off the event we intialized above. + event_add(&be->main_event, &be->tunables.connect); } else { _reset_bad_backend(be, P_BE_FAIL_CONNECTING); _backend_failed(be); @@ -855,9 +873,13 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) { if (flags == -1) { _reset_bad_backend(be, P_BE_FAIL_WRITING); _backend_failed(be); - } else { - flags |= EV_TIMEOUT; - _set_event(be, t->base, flags, be->tunables.read, proxy_backend_handler); + } else if (flags & EV_WRITE) { + // only get here because we need to kick off the write handler + _start_write_event(be); + } + + if (be->pending_read) { + _start_timeout_event(be); } } } @@ -876,28 +898,58 @@ void *proxy_event_thread(void *arg) { return NULL; } -// FIXME (v2): if we use the newer API the various pending checks can be adjusted. -static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback) { - // FIXME (v2): chicken and egg. - // can't check if pending if the structure is was calloc'ed (sigh) - // don't want to double test here. should be able to event_assign but - // not add anything during initialization, but need the owner thread's - // event base. - int pending = 0; - if (event_initialized(&be->event)) { - pending = event_pending(&be->event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL); - } - if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) { - event_del(&be->event); // replace existing event. +static void _set_main_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval *t, event_callback_fn callback) { + int pending = event_pending(&be->main_event, EV_READ|EV_WRITE, NULL); + if ((pending & (EV_READ|EV_WRITE)) != 0) { + event_del(&be->main_event); // replace existing event. } - // if we can't write, we could be connecting. - // TODO (v2): always check for READ in case some commands were sent - // successfully? The flags could be tracked on *be and reset in the - // handler, perhaps? - event_assign(&be->event, base, mcmc_fd(be->client), + event_assign(&be->main_event, base, mcmc_fd(be->client), flags, callback, be); - event_add(&be->event, &t); + event_add(&be->main_event, t); +} + +static void _stop_main_event(mcp_backend_t *be) { + int pending = event_pending(&be->main_event, EV_READ|EV_WRITE, NULL); + if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) == 0) { + return; + } + event_del(&be->write_event); +} + +static void _start_write_event(mcp_backend_t *be) { + int pending = event_pending(&be->main_event, EV_WRITE, NULL); + if ((pending & (EV_WRITE|EV_TIMEOUT)) != 0) { + return; + } + // FIXME: wasn't there a write timeout? + event_add(&be->write_event, &be->tunables.read); +} + +static void _stop_write_event(mcp_backend_t *be) { + int pending = event_pending(&be->main_event, EV_WRITE, NULL); + if ((pending & (EV_WRITE|EV_TIMEOUT)) == 0) { + return; + } + event_del(&be->write_event); +} + +// handle the read timeouts with a side event, so we can stick with a +// persistent listener (optimization + catch disconnects faster) +static void _start_timeout_event(mcp_backend_t *be) { + int pending = event_pending(&be->timeout_event, EV_TIMEOUT, NULL); + if ((pending & (EV_TIMEOUT)) != 0) { + return; + } + event_add(&be->timeout_event, &be->tunables.read); +} + +static void _stop_timeout_event(mcp_backend_t *be) { + int pending = event_pending(&be->timeout_event, EV_TIMEOUT, NULL); + if ((pending & (EV_TIMEOUT)) == 0) { + return; + } + event_del(&be->timeout_event); } // NOTES: @@ -1106,6 +1158,7 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { // set the head here. when we break the head will be correct. STAILQ_REMOVE_HEAD(&be->io_head, io_next); be->depth--; + be->pending_read--; // have to do the q->count-- and == 0 and redispatch_conn() // stuff here. The moment we call return_io here we // don't own *p anymore. @@ -1150,12 +1203,39 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { return flags; } +static void _backend_reconnect(mcp_backend_t *be) { + int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags); + if (status == MCMC_CONNECTED) { + // TODO (v2): unexpected but lets let it be here. + be->connecting = false; + be->can_write = true; + } else if (status == MCMC_CONNECTING) { + be->connecting = true; + be->can_write = false; + } else { + // TODO (v2): failed to immediately re-establish the connection. + // need to put the BE into a bad/retry state. + // FIXME (v2): until we get an event to specifically handle connecting and + // bad server handling, attempt to force a reconnect here the next + // time a request comes through. + // The event thread will attempt to write to the backend, fail, then + // end up in this routine again. + be->connecting = false; + be->can_write = true; + } + // re-create the write handler for the new file descriptor. + // the main event will be re-assigned after this call. + event_assign(&be->write_event, be->event_thread->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be); + // do not need to re-assign the timer event because it's not tied to fd +} + // All we need to do here is schedule the backend to attempt to connect again. static void proxy_backend_retry_handler(const int fd, const short which, void *arg) { mcp_backend_t *be = arg; assert(which & EV_TIMEOUT); struct timeval tmp_time = be->tunables.retry; - _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler); + _backend_reconnect(be); + _set_main_event(be, be->event_thread->base, EV_WRITE, &tmp_time, proxy_beconn_handler); } // currently just for timeouts, but certain errors should consider a backend @@ -1176,11 +1256,12 @@ static void _backend_failed(mcp_backend_t *be) { if (++be->failed_count > be->tunables.backend_failure_limit) { P_DEBUG("%s: marking backend as bad\n", __func__); be->bad = true; - _set_event(be, be->event_thread->base, EV_TIMEOUT, tmp_time, proxy_backend_retry_handler); + _set_main_event(be, be->event_thread->base, EV_TIMEOUT, &tmp_time, proxy_backend_retry_handler); STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1); } else { STAT_INCR(be->event_thread->ctx, backend_failed, 1); - _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler); + _backend_reconnect(be); + _set_main_event(be, be->event_thread->base, EV_WRITE, &tmp_time, proxy_beconn_handler); } } @@ -1214,26 +1295,13 @@ static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) { // reset buffer to blank state. be->rbufused = 0; + be->pending_read = 0; + // allow the _backend_failed() routine to connect when ready. + _stop_write_event(be); + _stop_main_event(be); + _stop_timeout_event(be); mcmc_disconnect(be->client); - int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags); - if (status == MCMC_CONNECTED) { - // TODO (v2): unexpected but lets let it be here. - be->connecting = false; - be->can_write = true; - } else if (status == MCMC_CONNECTING) { - be->connecting = true; - be->can_write = false; - } else { - // TODO (v2): failed to immediately re-establish the connection. - // need to put the BE into a bad/retry state. - // FIXME (v2): until we get an event to specifically handle connecting and - // bad server handling, attempt to force a reconnect here the next - // time a request comes through. - // The event thread will attempt to write to the backend, fail, then - // end up in this routine again. - be->connecting = false; - be->can_write = true; - } + // we leave the main event alone, because be_failed() always overwrites. return 0; } @@ -1266,13 +1334,12 @@ static int _prep_pending_write(mcp_backend_t *be) { } // returns true if any pending writes were fully flushed. -static bool _post_pending_write(mcp_backend_t *be, ssize_t sent) { +static void _post_pending_write(mcp_backend_t *be, ssize_t sent) { io_pending_proxy_t *io = be->io_next; if (io == NULL) { io = STAILQ_FIRST(&be->io_head); } - bool did_flush = false; for (; io; io = STAILQ_NEXT(io, io_next)) { bool flushed = true; if (io->flushed) @@ -1297,10 +1364,10 @@ static bool _post_pending_write(mcp_backend_t *be, ssize_t sent) { } } io->flushed = flushed; - if (flushed) { - did_flush = flushed; + be->pending_read++; } + if (sent <= 0) { // really shouldn't be negative, though. assert(sent >= 0); @@ -1314,8 +1381,6 @@ static bool _post_pending_write(mcp_backend_t *be, ssize_t sent) { } else { be->io_next = NULL; } - - return did_flush; } static int _flush_pending_write(mcp_backend_t *be) { @@ -1329,9 +1394,7 @@ static int _flush_pending_write(mcp_backend_t *be) { ssize_t sent = writev(mcmc_fd(be->client), be->write_iovs, iovcnt); if (sent > 0) { - if (_post_pending_write(be, sent)) { - flags |= EV_READ; - } + _post_pending_write(be, sent); // still have unflushed pending IO's, check for write and re-loop. if (be->io_next) { be->can_write = false; @@ -1370,7 +1433,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) { if (_proxy_beconn_checkconnect(be) == -1) { return; } - _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler); + _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_beconn_handler); } // TODO: currently never taken, until validation is made optional. @@ -1382,6 +1445,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) { return; } flags |= res; + // FIXME: set write event? } } @@ -1398,7 +1462,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) { // Needed more data for a version line, somehow. I feel like // this should set off some alarms, but it is possible. if (r.code == MCMC_WANT_READ) { - _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler); + _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_beconn_handler); return; } @@ -1427,7 +1491,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) { _backend_failed(be); return; } - _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler); + _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_beconn_handler); return; } @@ -1438,14 +1502,18 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) { _backend_failed(be); return; } - flags |= res; + if (flags & EV_WRITE) { + _start_write_event(be); + } + if (be->pending_read) { + _start_timeout_event(be); + } } - // Still pending requests to read or write. - if (!be->validating && !STAILQ_EMPTY(&be->io_head)) { - _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler); + // switch to the primary persistent read event. + if (!be->validating) { + _set_main_event(be, be->event_thread->base, EV_READ|EV_PERSIST, NULL, proxy_backend_handler); } - } // The libevent backend callback handler. @@ -1453,8 +1521,6 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) { // state. static void proxy_backend_handler(const int fd, const short which, void *arg) { mcp_backend_t *be = arg; - int flags = EV_TIMEOUT; - struct timeval tmp_time = be->tunables.read; if (which & EV_TIMEOUT) { P_DEBUG("%s: timeout received, killing backend queue\n", __func__); @@ -1471,10 +1537,14 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) { _backend_failed(be); return; } - flags |= res; + if (res & EV_WRITE) { + _start_write_event(be); + } } if (which & EV_READ) { + // got a read event, always kill the pending read timer. + _stop_timeout_event(be); // We do the syscall here before diving into the state machine to allow a // common code path for io_uring/epoll int read = recv(mcmc_fd(be->client), be->rbuf + be->rbufused, @@ -1508,13 +1578,8 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) { #endif } - // Still pending requests to read or write. - if (!STAILQ_EMPTY(&be->io_head)) { - flags |= EV_READ; // FIXME (v2): might not be necessary here, but ensures we get a disconnect event. - if (!be->can_write) { - flags |= EV_WRITE; - } - _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler); + if (be->pending_read) { + _start_timeout_event(be); } } -- cgit v1.2.1