diff options
author | dormando <dormando@rydia.net> | 2022-12-05 14:04:11 -0800 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2022-12-06 22:33:13 -0800 |
commit | 146c6489fccf659bbb98e5012169661e7dfc7b69 (patch) | |
tree | ee53f42f0f9ffb87eafa342cf0f4b8b61ac3041b /proxy_network.c | |
parent | 1ba5df8410e7ccc035390438e45b26c2d11ede5c (diff) | |
download | memcached-146c6489fccf659bbb98e5012169661e7dfc7b69.tar.gz |
proxy: IO thread performance improvements
1) more IOV's per syscall
2) if a backend got a large stack of pending IO's + continual writes the
CPU usage of the IO thread would bloat while looping past already
flushed IO objects.
Diffstat (limited to 'proxy_network.c')
-rw-r--r-- | proxy_network.c | 23 |
1 files changed, 19 insertions, 4 deletions
diff --git a/proxy_network.c b/proxy_network.c index 9d885e6..61fa69b 100644 --- a/proxy_network.c +++ b/proxy_network.c @@ -87,6 +87,11 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) { continue; } STAILQ_INSERT_TAIL(&be->io_head, io, io_next); + if (be->io_next == NULL) { + // separate pointer into the request queue for how far we've + // flushed writes. + be->io_next = io; + } be->depth++; io_count++; if (!be->stacked) { @@ -897,6 +902,7 @@ static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) { } STAILQ_INIT(&be->io_head); + be->io_next = NULL; // also reset the write offset. // reset buffer to blank state. be->rbufused = 0; @@ -930,7 +936,10 @@ static int _prep_pending_write(mcp_backend_t *be, unsigned int *tosend) { struct iovec *iovs = be->write_iovs; io_pending_proxy_t *io = NULL; int iovused = 0; - STAILQ_FOREACH(io, &be->io_head, io_next) { + assert(be->io_next != NULL); + io = be->io_next; + for (; io; io = STAILQ_NEXT(io, io_next)) { + // TODO (v2): paranoia for now, but this check should never fire if (io->flushed) continue; @@ -952,16 +961,21 @@ static int _prep_pending_write(mcp_backend_t *be, unsigned int *tosend) { static int _flush_pending_write(mcp_backend_t *be) { int flags = 0; unsigned int tosend = 0; + // Allow us to be called with an empty stack to prevent dev errors. + if (STAILQ_EMPTY(&be->io_head)) { + return 0; + } + int iovcnt = _prep_pending_write(be, &tosend); ssize_t sent = writev(mcmc_fd(be->client), be->write_iovs, iovcnt); if (sent > 0) { - io_pending_proxy_t *io = NULL; + io_pending_proxy_t *io = be->io_next; if (sent < tosend) { flags |= EV_WRITE; } - STAILQ_FOREACH(io, &be->io_head, io_next) { + for (; io; io = STAILQ_NEXT(io, io_next)) { bool flushed = true; if (io->flushed) continue; @@ -989,13 +1003,14 @@ static int _flush_pending_write(mcp_backend_t *be) { if (flushed) { flags |= EV_READ; + be->io_next = STAILQ_NEXT(io, io_next); } if (sent <= 0) { // really shouldn't be negative, though. assert(sent >= 0); break; } - } // STAILQ_FOREACH + } // for } else if (sent == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { be->can_write = false; |