From 833a7234bbaed264a9973141850a23df4eb1b939 Mon Sep 17 00:00:00 2001 From: dormando Date: Tue, 21 Feb 2023 12:54:07 -0800 Subject: proxy: redo libevent handling code The event handling code was unoptimized and temporary; it was slated for a rewrite for performance and non-critical bugs alone. However the old code may be causing critical bugs so it's being rewritten now. Fixes: - backend disconnects are detected immediately instead of on the next time they are used. - backend reconnects happen _after_ the retry timeout, not before - use a persistent read handler and a temporary write handler to avoid constantly calling epoll_ctl syscalls for potential performance boost. Updated some tests for proxyconfig.t as it was picking up the disconnects immediately. Unrelated to a timing issue I resolved to the benchmark. --- proxy.h | 7 +- proxy_lua.c | 4 +- proxy_network.c | 221 ++++++++++++++++++++++++++++++++++++-------------------- t/proxyconfig.t | 10 ++- 4 files changed, 157 insertions(+), 85 deletions(-) diff --git a/proxy.h b/proxy.h index 171f695..23aabd8 100644 --- a/proxy.h +++ b/proxy.h @@ -323,7 +323,8 @@ struct mcp_backend_wrap_s { // FIXME: inline the mcmc client data. // TODO: event_thread -> something? union of owner type? struct mcp_backend_s { - int depth; + int depth; // total number of requests in queue + int pending_read; // number of requests written to socket, pending read. int failed_count; // number of fails (timeouts) in a row proxy_event_thread_t *event_thread; // event thread owning this backend. void *client; // mcmc client @@ -333,7 +334,9 @@ struct mcp_backend_s { io_pending_proxy_t *io_next; // next request to write. char *rbuf; // statically allocated read buffer. size_t rbufused; // currently active bytes in the buffer - struct event event; // libevent + struct event main_event; // libevent: changes role, mostly for main read events + struct event write_event; // libevent: only used when socket wbuf full + struct event timeout_event; // libevent: alarm for pending reads struct proxy_tunables tunables; #ifdef HAVE_LIBURING proxy_event_t ur_rd_ev; // liburing. diff --git a/proxy_lua.c b/proxy_lua.c index 2d38b8e..e6b50ae 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -337,7 +337,9 @@ static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_la } // initialize libevent. - memset(&be->event, 0, sizeof(be->event)); + memset(&be->main_event, 0, sizeof(be->main_event)); + memset(&be->write_event, 0, sizeof(be->write_event)); + memset(&be->timeout_event, 0, sizeof(be->timeout_event)); // initialize the client be->client = malloc(mcmc_size(MCMC_OPTION_BLANK)); diff --git a/proxy_network.c b/proxy_network.c index 3983ec5..239b0d4 100644 --- a/proxy_network.c +++ b/proxy_network.c @@ -41,12 +41,17 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg); static void proxy_event_handler(evutil_socket_t fd, short which, void *arg); static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg); static int _prep_pending_write(mcp_backend_t *be); -static bool _post_pending_write(mcp_backend_t *be, ssize_t sent); +static void _post_pending_write(mcp_backend_t *be, ssize_t sent); static int _flush_pending_write(mcp_backend_t *be); static void _cleanup_backend(mcp_backend_t *be); static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err); static void _backend_failed(mcp_backend_t *be); -static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback); +static void _set_main_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval *t, event_callback_fn callback); +static void _stop_main_event(mcp_backend_t *be); +static void _start_write_event(mcp_backend_t *be); +static void _stop_write_event(mcp_backend_t *be); +static void _start_timeout_event(mcp_backend_t *be); +static void _stop_timeout_event(mcp_backend_t *be); static int proxy_backend_drive_machine(mcp_backend_t *be); /* Helper routines common to io_uring and libevent modes */ @@ -725,12 +730,17 @@ static void _cleanup_backend(mcp_backend_t *be) { } else { #endif // remove any pending events. - int pending = 0; - if (event_initialized(&be->event)) { - pending = event_pending(&be->event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL); + int pending = event_pending(&be->main_event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL); + if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) { + event_del(&be->main_event); // an error to call event_del() without event. } + pending = event_pending(&be->write_event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL); if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) { - event_del(&be->event); // an error to call event_del() without event. + event_del(&be->write_event); // an error to call event_del() without event. + } + pending = event_pending(&be->timeout_event, EV_TIMEOUT, NULL); + if ((pending & (EV_TIMEOUT)) != 0) { + event_del(&be->timeout_event); // an error to call event_del() without event. } #ifdef HAVE_LIBURING } @@ -793,14 +803,22 @@ static void proxy_event_beconn(evutil_socket_t fd, short which, void *arg) { } else { be->transferred = true; be->event_thread = t; + // assign the initial events to the backend, so we don't have to + // constantly check if they were initialized yet elsewhere. + // note these events will not fire until event_add() is called. int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags); + event_assign(&be->main_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_beconn_handler, be); + event_assign(&be->write_event, t->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be); + event_assign(&be->timeout_event, t->base, -1, EV_TIMEOUT, proxy_backend_handler, be); + if (status == MCMC_CONNECTING || status == MCMC_CONNECTED) { // if we're already connected for some reason, still push it // through the connection handler to keep the code unified. It // will auto-wake because the socket is writeable. be->connecting = true; be->can_write = false; - _set_event(be, t->base, EV_WRITE|EV_TIMEOUT, be->tunables.connect, proxy_beconn_handler); + // kick off the event we intialized above. + event_add(&be->main_event, &be->tunables.connect); } else { _reset_bad_backend(be, P_BE_FAIL_CONNECTING); _backend_failed(be); @@ -855,9 +873,13 @@ static void proxy_event_handler(evutil_socket_t fd, short which, void *arg) { if (flags == -1) { _reset_bad_backend(be, P_BE_FAIL_WRITING); _backend_failed(be); - } else { - flags |= EV_TIMEOUT; - _set_event(be, t->base, flags, be->tunables.read, proxy_backend_handler); + } else if (flags & EV_WRITE) { + // only get here because we need to kick off the write handler + _start_write_event(be); + } + + if (be->pending_read) { + _start_timeout_event(be); } } } @@ -876,28 +898,58 @@ void *proxy_event_thread(void *arg) { return NULL; } -// FIXME (v2): if we use the newer API the various pending checks can be adjusted. -static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback) { - // FIXME (v2): chicken and egg. - // can't check if pending if the structure is was calloc'ed (sigh) - // don't want to double test here. should be able to event_assign but - // not add anything during initialization, but need the owner thread's - // event base. - int pending = 0; - if (event_initialized(&be->event)) { - pending = event_pending(&be->event, EV_READ|EV_WRITE|EV_TIMEOUT, NULL); - } - if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) != 0) { - event_del(&be->event); // replace existing event. +static void _set_main_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval *t, event_callback_fn callback) { + int pending = event_pending(&be->main_event, EV_READ|EV_WRITE, NULL); + if ((pending & (EV_READ|EV_WRITE)) != 0) { + event_del(&be->main_event); // replace existing event. } - // if we can't write, we could be connecting. - // TODO (v2): always check for READ in case some commands were sent - // successfully? The flags could be tracked on *be and reset in the - // handler, perhaps? - event_assign(&be->event, base, mcmc_fd(be->client), + event_assign(&be->main_event, base, mcmc_fd(be->client), flags, callback, be); - event_add(&be->event, &t); + event_add(&be->main_event, t); +} + +static void _stop_main_event(mcp_backend_t *be) { + int pending = event_pending(&be->main_event, EV_READ|EV_WRITE, NULL); + if ((pending & (EV_READ|EV_WRITE|EV_TIMEOUT)) == 0) { + return; + } + event_del(&be->write_event); +} + +static void _start_write_event(mcp_backend_t *be) { + int pending = event_pending(&be->main_event, EV_WRITE, NULL); + if ((pending & (EV_WRITE|EV_TIMEOUT)) != 0) { + return; + } + // FIXME: wasn't there a write timeout? + event_add(&be->write_event, &be->tunables.read); +} + +static void _stop_write_event(mcp_backend_t *be) { + int pending = event_pending(&be->main_event, EV_WRITE, NULL); + if ((pending & (EV_WRITE|EV_TIMEOUT)) == 0) { + return; + } + event_del(&be->write_event); +} + +// handle the read timeouts with a side event, so we can stick with a +// persistent listener (optimization + catch disconnects faster) +static void _start_timeout_event(mcp_backend_t *be) { + int pending = event_pending(&be->timeout_event, EV_TIMEOUT, NULL); + if ((pending & (EV_TIMEOUT)) != 0) { + return; + } + event_add(&be->timeout_event, &be->tunables.read); +} + +static void _stop_timeout_event(mcp_backend_t *be) { + int pending = event_pending(&be->timeout_event, EV_TIMEOUT, NULL); + if ((pending & (EV_TIMEOUT)) == 0) { + return; + } + event_del(&be->timeout_event); } // NOTES: @@ -1106,6 +1158,7 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { // set the head here. when we break the head will be correct. STAILQ_REMOVE_HEAD(&be->io_head, io_next); be->depth--; + be->pending_read--; // have to do the q->count-- and == 0 and redispatch_conn() // stuff here. The moment we call return_io here we // don't own *p anymore. @@ -1150,12 +1203,39 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { return flags; } +static void _backend_reconnect(mcp_backend_t *be) { + int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags); + if (status == MCMC_CONNECTED) { + // TODO (v2): unexpected but lets let it be here. + be->connecting = false; + be->can_write = true; + } else if (status == MCMC_CONNECTING) { + be->connecting = true; + be->can_write = false; + } else { + // TODO (v2): failed to immediately re-establish the connection. + // need to put the BE into a bad/retry state. + // FIXME (v2): until we get an event to specifically handle connecting and + // bad server handling, attempt to force a reconnect here the next + // time a request comes through. + // The event thread will attempt to write to the backend, fail, then + // end up in this routine again. + be->connecting = false; + be->can_write = true; + } + // re-create the write handler for the new file descriptor. + // the main event will be re-assigned after this call. + event_assign(&be->write_event, be->event_thread->base, mcmc_fd(be->client), EV_WRITE|EV_TIMEOUT, proxy_backend_handler, be); + // do not need to re-assign the timer event because it's not tied to fd +} + // All we need to do here is schedule the backend to attempt to connect again. static void proxy_backend_retry_handler(const int fd, const short which, void *arg) { mcp_backend_t *be = arg; assert(which & EV_TIMEOUT); struct timeval tmp_time = be->tunables.retry; - _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler); + _backend_reconnect(be); + _set_main_event(be, be->event_thread->base, EV_WRITE, &tmp_time, proxy_beconn_handler); } // currently just for timeouts, but certain errors should consider a backend @@ -1176,11 +1256,12 @@ static void _backend_failed(mcp_backend_t *be) { if (++be->failed_count > be->tunables.backend_failure_limit) { P_DEBUG("%s: marking backend as bad\n", __func__); be->bad = true; - _set_event(be, be->event_thread->base, EV_TIMEOUT, tmp_time, proxy_backend_retry_handler); + _set_main_event(be, be->event_thread->base, EV_TIMEOUT, &tmp_time, proxy_backend_retry_handler); STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1); } else { STAT_INCR(be->event_thread->ctx, backend_failed, 1); - _set_event(be, be->event_thread->base, EV_WRITE|EV_TIMEOUT, tmp_time, proxy_beconn_handler); + _backend_reconnect(be); + _set_main_event(be, be->event_thread->base, EV_WRITE, &tmp_time, proxy_beconn_handler); } } @@ -1214,26 +1295,13 @@ static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) { // reset buffer to blank state. be->rbufused = 0; + be->pending_read = 0; + // allow the _backend_failed() routine to connect when ready. + _stop_write_event(be); + _stop_main_event(be); + _stop_timeout_event(be); mcmc_disconnect(be->client); - int status = mcmc_connect(be->client, be->name, be->port, be->connect_flags); - if (status == MCMC_CONNECTED) { - // TODO (v2): unexpected but lets let it be here. - be->connecting = false; - be->can_write = true; - } else if (status == MCMC_CONNECTING) { - be->connecting = true; - be->can_write = false; - } else { - // TODO (v2): failed to immediately re-establish the connection. - // need to put the BE into a bad/retry state. - // FIXME (v2): until we get an event to specifically handle connecting and - // bad server handling, attempt to force a reconnect here the next - // time a request comes through. - // The event thread will attempt to write to the backend, fail, then - // end up in this routine again. - be->connecting = false; - be->can_write = true; - } + // we leave the main event alone, because be_failed() always overwrites. return 0; } @@ -1266,13 +1334,12 @@ static int _prep_pending_write(mcp_backend_t *be) { } // returns true if any pending writes were fully flushed. -static bool _post_pending_write(mcp_backend_t *be, ssize_t sent) { +static void _post_pending_write(mcp_backend_t *be, ssize_t sent) { io_pending_proxy_t *io = be->io_next; if (io == NULL) { io = STAILQ_FIRST(&be->io_head); } - bool did_flush = false; for (; io; io = STAILQ_NEXT(io, io_next)) { bool flushed = true; if (io->flushed) @@ -1297,10 +1364,10 @@ static bool _post_pending_write(mcp_backend_t *be, ssize_t sent) { } } io->flushed = flushed; - if (flushed) { - did_flush = flushed; + be->pending_read++; } + if (sent <= 0) { // really shouldn't be negative, though. assert(sent >= 0); @@ -1314,8 +1381,6 @@ static bool _post_pending_write(mcp_backend_t *be, ssize_t sent) { } else { be->io_next = NULL; } - - return did_flush; } static int _flush_pending_write(mcp_backend_t *be) { @@ -1329,9 +1394,7 @@ static int _flush_pending_write(mcp_backend_t *be) { ssize_t sent = writev(mcmc_fd(be->client), be->write_iovs, iovcnt); if (sent > 0) { - if (_post_pending_write(be, sent)) { - flags |= EV_READ; - } + _post_pending_write(be, sent); // still have unflushed pending IO's, check for write and re-loop. if (be->io_next) { be->can_write = false; @@ -1370,7 +1433,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) { if (_proxy_beconn_checkconnect(be) == -1) { return; } - _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler); + _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_beconn_handler); } // TODO: currently never taken, until validation is made optional. @@ -1382,6 +1445,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) { return; } flags |= res; + // FIXME: set write event? } } @@ -1398,7 +1462,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) { // Needed more data for a version line, somehow. I feel like // this should set off some alarms, but it is possible. if (r.code == MCMC_WANT_READ) { - _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler); + _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_beconn_handler); return; } @@ -1427,7 +1491,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) { _backend_failed(be); return; } - _set_event(be, be->event_thread->base, EV_READ, tmp_time, proxy_beconn_handler); + _set_main_event(be, be->event_thread->base, EV_READ, &tmp_time, proxy_beconn_handler); return; } @@ -1438,14 +1502,18 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) { _backend_failed(be); return; } - flags |= res; + if (flags & EV_WRITE) { + _start_write_event(be); + } + if (be->pending_read) { + _start_timeout_event(be); + } } - // Still pending requests to read or write. - if (!be->validating && !STAILQ_EMPTY(&be->io_head)) { - _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler); + // switch to the primary persistent read event. + if (!be->validating) { + _set_main_event(be, be->event_thread->base, EV_READ|EV_PERSIST, NULL, proxy_backend_handler); } - } // The libevent backend callback handler. @@ -1453,8 +1521,6 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) { // state. static void proxy_backend_handler(const int fd, const short which, void *arg) { mcp_backend_t *be = arg; - int flags = EV_TIMEOUT; - struct timeval tmp_time = be->tunables.read; if (which & EV_TIMEOUT) { P_DEBUG("%s: timeout received, killing backend queue\n", __func__); @@ -1471,10 +1537,14 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) { _backend_failed(be); return; } - flags |= res; + if (res & EV_WRITE) { + _start_write_event(be); + } } if (which & EV_READ) { + // got a read event, always kill the pending read timer. + _stop_timeout_event(be); // We do the syscall here before diving into the state machine to allow a // common code path for io_uring/epoll int read = recv(mcmc_fd(be->client), be->rbuf + be->rbufused, @@ -1508,13 +1578,8 @@ static void proxy_backend_handler(const int fd, const short which, void *arg) { #endif } - // Still pending requests to read or write. - if (!STAILQ_EMPTY(&be->io_head)) { - flags |= EV_READ; // FIXME (v2): might not be necessary here, but ensures we get a disconnect event. - if (!be->can_write) { - flags |= EV_WRITE; - } - _set_event(be, be->event_thread->base, flags, tmp_time, proxy_backend_handler); + if (be->pending_read) { + _start_timeout_event(be); } } diff --git a/t/proxyconfig.t b/t/proxyconfig.t index 9561415..1b19b8e 100644 --- a/t/proxyconfig.t +++ b/t/proxyconfig.t @@ -61,7 +61,7 @@ sub wait_reload { my @mocksrvs = (); diag "making mock servers"; -for my $port (11511, 11512, 11513, 11514, 11515, 11516) { +for my $port (11511, 11512, 11513) { my $srv = mock_server($port); ok(defined $srv, "mock server created"); push(@mocksrvs, $srv); @@ -98,9 +98,9 @@ is(<$watcher>, "OK\r\n", "watcher enabled"); wait_reload($watcher); } +my @mbe = (); { # set up server backend sockets. - my @mbe = (); for my $msrv ($mocksrvs[0], $mocksrvs[1], $mocksrvs[2]) { my $be = $msrv->accept(); $be->autoflush(1); @@ -128,6 +128,7 @@ is(<$watcher>, "OK\r\n", "watcher enabled"); } # Test backend table arguments and per-backend time overrides +my @holdbe = (); # avoid having the backends immediately disconnect and pollute log lines. { # This should create three new backend sockets write_modefile('return "betable"'); @@ -154,13 +155,14 @@ is(<$watcher>, "OK\r\n", "watcher enabled"); ok(defined $be, "mock backend accepted"); like(<$be>, qr/version/, "received version command"); print $be "VERSION 1.0.0-mock\r\n"; + push(@holdbe, $be); } - # reload again and ensure only the bad socket became available. + # reload again and ensure no sockets become readable $p_srv->reload(); wait_reload($watcher); @readable = $s->can_read(0.5); - is(scalar @readable, 1, "only one listener became readable"); + is(scalar @readable, 0, "no new sockets"); } # TODO: -- cgit v1.2.1