summaryrefslogtreecommitdiff
path: root/proxy_network.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2022-12-05 14:04:11 -0800
committerdormando <dormando@rydia.net>2022-12-06 22:33:13 -0800
commit146c6489fccf659bbb98e5012169661e7dfc7b69 (patch)
treeee53f42f0f9ffb87eafa342cf0f4b8b61ac3041b /proxy_network.c
parent1ba5df8410e7ccc035390438e45b26c2d11ede5c (diff)
downloadmemcached-146c6489fccf659bbb98e5012169661e7dfc7b69.tar.gz
proxy: IO thread performance improvements
1) more IOV's per syscall 2) if a backend got a large stack of pending IO's + continual writes the CPU usage of the IO thread would bloat while looping past already flushed IO objects.
Diffstat (limited to 'proxy_network.c')
-rw-r--r--proxy_network.c23
1 files changed, 19 insertions, 4 deletions
diff --git a/proxy_network.c b/proxy_network.c
index 9d885e6..61fa69b 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -87,6 +87,11 @@ static int _proxy_event_handler_dequeue(proxy_event_thread_t *t) {
continue;
}
STAILQ_INSERT_TAIL(&be->io_head, io, io_next);
+ if (be->io_next == NULL) {
+ // separate pointer into the request queue for how far we've
+ // flushed writes.
+ be->io_next = io;
+ }
be->depth++;
io_count++;
if (!be->stacked) {
@@ -897,6 +902,7 @@ static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) {
}
STAILQ_INIT(&be->io_head);
+ be->io_next = NULL; // also reset the write offset.
// reset buffer to blank state.
be->rbufused = 0;
@@ -930,7 +936,10 @@ static int _prep_pending_write(mcp_backend_t *be, unsigned int *tosend) {
struct iovec *iovs = be->write_iovs;
io_pending_proxy_t *io = NULL;
int iovused = 0;
- STAILQ_FOREACH(io, &be->io_head, io_next) {
+ assert(be->io_next != NULL);
+ io = be->io_next;
+ for (; io; io = STAILQ_NEXT(io, io_next)) {
+ // TODO (v2): paranoia for now, but this check should never fire
if (io->flushed)
continue;
@@ -952,16 +961,21 @@ static int _prep_pending_write(mcp_backend_t *be, unsigned int *tosend) {
static int _flush_pending_write(mcp_backend_t *be) {
int flags = 0;
unsigned int tosend = 0;
+ // Allow us to be called with an empty stack to prevent dev errors.
+ if (STAILQ_EMPTY(&be->io_head)) {
+ return 0;
+ }
+
int iovcnt = _prep_pending_write(be, &tosend);
ssize_t sent = writev(mcmc_fd(be->client), be->write_iovs, iovcnt);
if (sent > 0) {
- io_pending_proxy_t *io = NULL;
+ io_pending_proxy_t *io = be->io_next;
if (sent < tosend) {
flags |= EV_WRITE;
}
- STAILQ_FOREACH(io, &be->io_head, io_next) {
+ for (; io; io = STAILQ_NEXT(io, io_next)) {
bool flushed = true;
if (io->flushed)
continue;
@@ -989,13 +1003,14 @@ static int _flush_pending_write(mcp_backend_t *be) {
if (flushed) {
flags |= EV_READ;
+ be->io_next = STAILQ_NEXT(io, io_next);
}
if (sent <= 0) {
// really shouldn't be negative, though.
assert(sent >= 0);
break;
}
- } // STAILQ_FOREACH
+ } // for
} else if (sent == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
be->can_write = false;